Compare commits
1 Commits
main
...
add-watche
Author | SHA1 | Date | |
---|---|---|---|
33c18ff340
|
@ -24,6 +24,7 @@ use thiserror::Error;
|
|||||||
version = "v1alpha1",
|
version = "v1alpha1",
|
||||||
namespaced
|
namespaced
|
||||||
)]
|
)]
|
||||||
|
|
||||||
#[kube(status = "ConfigSetStatus", shortname = "confset")]
|
#[kube(status = "ConfigSetStatus", shortname = "confset")]
|
||||||
pub struct ConfigSetSpec {
|
pub struct ConfigSetSpec {
|
||||||
pub targets: Vec<TargetWithName>,
|
pub targets: Vec<TargetWithName>,
|
||||||
|
@ -2,19 +2,23 @@ use crate::api::v1alpha1::configsets_api::{
|
|||||||
ConfigSet, Input, InputWithName, TargetWithName, Templates,
|
ConfigSet, Input, InputWithName, TargetWithName, Templates,
|
||||||
};
|
};
|
||||||
use core::fmt;
|
use core::fmt;
|
||||||
use futures::StreamExt;
|
use futures::{StreamExt, TryStreamExt, stream};
|
||||||
use handlebars::Handlebars;
|
use handlebars::Handlebars;
|
||||||
use k8s_openapi::api::core::v1::{ConfigMap, Secret};
|
use k8s_openapi::api::core::v1::{ConfigMap, Namespace, Secret};
|
||||||
use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference;
|
use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference;
|
||||||
use k8s_openapi::{ByteString, NamespaceResourceScope};
|
use k8s_openapi::{ByteString, NamespaceResourceScope};
|
||||||
use kube::api::{ListParams, PostParams};
|
use kube::api::{GroupVersionKind, ListParams, PostParams, WatchEvent};
|
||||||
use kube::core::{Object, ObjectMeta};
|
use kube::core::{DynamicResourceScope, Object, ObjectMeta};
|
||||||
use kube::error::ErrorResponse;
|
use kube::error::ErrorResponse;
|
||||||
use kube::runtime::controller::Action;
|
use kube::runtime::controller::Action;
|
||||||
use kube::runtime::finalizer::Event as Finalizer;
|
use kube::runtime::finalizer::Event as Finalizer;
|
||||||
use kube::runtime::watcher::Config;
|
|
||||||
use kube::runtime::{finalizer, Controller};
|
use kube::runtime::{finalizer, Controller};
|
||||||
|
use kube::runtime::{
|
||||||
|
watcher::{self, Config},
|
||||||
|
WatchStreamExt,
|
||||||
|
};
|
||||||
use kube::{Api, Client, CustomResource};
|
use kube::{Api, Client, CustomResource};
|
||||||
|
use kube_client::api::WatchParams;
|
||||||
use kube_client::core::DynamicObject;
|
use kube_client::core::DynamicObject;
|
||||||
use kube_client::{Resource, ResourceExt};
|
use kube_client::{Resource, ResourceExt};
|
||||||
use log::*;
|
use log::*;
|
||||||
@ -94,13 +98,18 @@ pub async fn setup() {
|
|||||||
let client = Client::try_default()
|
let client = Client::try_default()
|
||||||
.await
|
.await
|
||||||
.expect("failed to create kube Client");
|
.expect("failed to create kube Client");
|
||||||
let docs = Api::<ConfigSet>::all(client.clone());
|
let confsets = Api::<ConfigSet>::all(client.clone());
|
||||||
if let Err(e) = docs.list(&ListParams::default().limit(1)).await {
|
if let Err(e) = confsets.list(&ListParams::default().limit(1)).await {
|
||||||
error!("{}", e);
|
error!("{}", e);
|
||||||
std::process::exit(1);
|
std::process::exit(1);
|
||||||
}
|
}
|
||||||
let ctx = Arc::new(Context { client });
|
|
||||||
Controller::new(docs, Config::default().any_semantic())
|
tokio::spawn(setup_secret_watcher(client.clone()));
|
||||||
|
|
||||||
|
let ctx = Arc::new(Context {
|
||||||
|
client: client.clone(),
|
||||||
|
});
|
||||||
|
Controller::new(confsets, Config::default().any_semantic())
|
||||||
.shutdown_on_signal()
|
.shutdown_on_signal()
|
||||||
.run(reconcile, error_policy, ctx)
|
.run(reconcile, error_policy, ctx)
|
||||||
.filter_map(|x| async move { std::result::Result::ok(x) })
|
.filter_map(|x| async move { std::result::Result::ok(x) })
|
||||||
@ -108,6 +117,107 @@ pub async fn setup() {
|
|||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn setup_secret_watcher(client: Client) {
|
||||||
|
let namespace_api: Api<Namespace> = Api::all(client.clone());
|
||||||
|
let stream = match namespace_api.watch(&WatchParams::default(), "0").await {
|
||||||
|
Ok(status) => {
|
||||||
|
let mut status_box = status.boxed();
|
||||||
|
while let Some(status) = status_box.try_next().await.unwrap() {
|
||||||
|
match status {
|
||||||
|
WatchEvent::Added(ns) => {
|
||||||
|
info!("namespace is being watched {}", ns.name_any());
|
||||||
|
let sec_api = get_secret_api(client.clone(), ns.name_any());
|
||||||
|
let cm_api = get_configmap_api(client.clone(), ns.name_any());
|
||||||
|
tokio::spawn( async move {
|
||||||
|
let stream = match sec_api.watch(&WatchParams::default(), "0").await {
|
||||||
|
Ok(status) => {
|
||||||
|
let mut status_box = status.boxed();
|
||||||
|
while let Some(status) = status_box.try_next().await.unwrap() {
|
||||||
|
match status {
|
||||||
|
WatchEvent::Modified(secret) => {
|
||||||
|
if is_shu_object(secret.metadata.clone()) {
|
||||||
|
let config_set_name = secret.metadata.clone().annotations.unwrap().get(WATCHED_BY_SHU).unwrap();
|
||||||
|
info!(
|
||||||
|
"secret {} was updated, triggering configset reconciliation",
|
||||||
|
secret.metadata.clone().name.unwrap(),
|
||||||
|
);
|
||||||
|
watcher_trigger_reconcile(secret.metadata.clone().annotations.unwrap().get(WATCHED_BY_SHU).unwrap().to_string(), secret.metadata.namespace.unwrap()).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
error!("couldn't start secrets watcher");
|
||||||
|
},
|
||||||
|
};
|
||||||
|
});
|
||||||
|
tokio::spawn( async move {
|
||||||
|
let stream = match cm_api.watch(&WatchParams::default(), "0").await {
|
||||||
|
Ok(status) => {
|
||||||
|
let mut status_box = status.boxed();
|
||||||
|
while let Some(status) = status_box.try_next().await.unwrap() {
|
||||||
|
match status {
|
||||||
|
WatchEvent::Modified(configmap) => {
|
||||||
|
if is_shu_object(configmap.metadata.clone()) {
|
||||||
|
let config_set_name = configmap.metadata.clone().annotations.unwrap().get(WATCHED_BY_SHU).unwrap();
|
||||||
|
info!(
|
||||||
|
"configmap {} was updated, triggering configset reconciliation",
|
||||||
|
configmap.metadata.clone().name.unwrap(),
|
||||||
|
);
|
||||||
|
watcher_trigger_reconcile(configmap.metadata.clone().annotations.unwrap().get(WATCHED_BY_SHU).unwrap().to_string(), configmap.metadata.namespace.unwrap()).await;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ => {},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
error!("couldn't start secrets watcher");
|
||||||
|
},
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
_ => {},
|
||||||
|
// WatchEvent::Deleted(_) => todo!(),
|
||||||
|
// WatchEvent::Bookmark(_) => todo!(),
|
||||||
|
// WatchEvent::Error(_) => todo!(),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
error!("couldn't watch namespaces, watcher is not enabled");
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_shu_object(metadata: ObjectMeta) -> bool {
|
||||||
|
match metadata.annotations {
|
||||||
|
Some(annotations) => match annotations.get(WATCHED_BY_SHU) {
|
||||||
|
Some(_) => true,
|
||||||
|
None => false,
|
||||||
|
},
|
||||||
|
None => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn watcher_trigger_reconcile(config_set_name: String, namespace: String) {
|
||||||
|
let client = Client::try_default().await.expect("failed to create kube Client");
|
||||||
|
let api: Api<ConfigSet> = Api::namespaced(client.clone(), &namespace);
|
||||||
|
let config_set = match api.get(&config_set_name).await {
|
||||||
|
Ok(res) => {
|
||||||
|
if let Err(err) = reconcile(Arc::new(res), Arc::new(Context {client })).await {
|
||||||
|
error!("an error occured during reconciliation: {}", err);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
error!("couldn't get configset: {}", err);
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
fn error_policy(doc: Arc<ConfigSet>, error: &Error, ctx: Arc<Context>) -> Action {
|
fn error_policy(doc: Arc<ConfigSet>, error: &Error, ctx: Arc<Context>) -> Action {
|
||||||
Action::requeue(Duration::from_secs(5 * 60))
|
Action::requeue(Duration::from_secs(5 * 60))
|
||||||
}
|
}
|
||||||
@ -122,8 +232,10 @@ fn get_configmap_api(client: Client, namespace: String) -> Api<ConfigMap> {
|
|||||||
|
|
||||||
async fn gather_inputs(
|
async fn gather_inputs(
|
||||||
client: Client,
|
client: Client,
|
||||||
|
config_set_name: String,
|
||||||
namespace: String,
|
namespace: String,
|
||||||
inputs: Vec<InputWithName>,
|
inputs: Vec<InputWithName>,
|
||||||
|
is_removed: bool,
|
||||||
) -> Result<HashMap<String, String>> {
|
) -> Result<HashMap<String, String>> {
|
||||||
let mut result: HashMap<String, String> = HashMap::new();
|
let mut result: HashMap<String, String> = HashMap::new();
|
||||||
for i in inputs {
|
for i in inputs {
|
||||||
@ -148,8 +260,19 @@ async fn gather_inputs(
|
|||||||
))))
|
))))
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
// Take dynamic resource identifiers:
|
||||||
|
|
||||||
|
// Use the full resource info to create an Api with the ApiResource as its DynamicType
|
||||||
|
let api: Api<ConfigMap> = Api::namespaced(client.clone(), &namespace);
|
||||||
|
let a: Api<DynamicObject> = api.try_into()?;
|
||||||
|
|
||||||
|
match is_removed {
|
||||||
|
true => unset_annotations(, i.from.key, config_set_name).await?,
|
||||||
|
false => set_annotations(api, i.from.key, config_set_name).await?,
|
||||||
|
};
|
||||||
value.to_string()
|
value.to_string()
|
||||||
}
|
}
|
||||||
|
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!("{err}");
|
error!("{err}");
|
||||||
return Err(Error::KubeError(err));
|
return Err(Error::KubeError(err));
|
||||||
@ -187,6 +310,49 @@ async fn gather_inputs(
|
|||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn set_annotations(api: Api<DynamicObject>, object_name: String, config_set_name: String) -> Result<()> {
|
||||||
|
let object = match api.get_opt(&object_name).await {
|
||||||
|
Ok(object) => match object {
|
||||||
|
Some(object) => object,
|
||||||
|
None => return Ok(()),
|
||||||
|
},
|
||||||
|
Err(err) => return Err(Error::KubeError(err)),
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut annotations = match object.clone().metadata.annotations {
|
||||||
|
Some(annotations) => annotations,
|
||||||
|
None => BTreeMap::new(),
|
||||||
|
};
|
||||||
|
|
||||||
|
annotations.insert(WATCHED_BY_SHU.to_string(), config_set_name).unwrap();
|
||||||
|
if let Err(err) = api.replace(&object_name, &PostParams::default(), &object).await {
|
||||||
|
return Err(Error::KubeError(err));
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn unset_annotations(api: Api<DynamicObject>, object_name: String, config_set_name: String) -> Result<()> {
|
||||||
|
let object = match api.get_opt(&object_name).await {
|
||||||
|
Ok(object) => match object {
|
||||||
|
Some(object) => object,
|
||||||
|
None => return Ok(()),
|
||||||
|
},
|
||||||
|
Err(err) => return Err(Error::KubeError(err)),
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut annotations = match object.clone().metadata.annotations {
|
||||||
|
Some(annotations) => annotations,
|
||||||
|
None => BTreeMap::new(),
|
||||||
|
};
|
||||||
|
|
||||||
|
annotations.insert(WATCHED_BY_SHU.to_string(), config_set_name).unwrap();
|
||||||
|
if let Err(err) = api.replace(&object_name, &PostParams::default(), &object).await {
|
||||||
|
return Err(Error::KubeError(err));
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
async fn gather_targets(
|
async fn gather_targets(
|
||||||
client: Client,
|
client: Client,
|
||||||
namespace: String,
|
namespace: String,
|
||||||
@ -399,8 +565,10 @@ impl ConfigSet {
|
|||||||
|
|
||||||
let inputs: HashMap<String, String> = gather_inputs(
|
let inputs: HashMap<String, String> = gather_inputs(
|
||||||
ctx.client.clone(),
|
ctx.client.clone(),
|
||||||
|
self.name_any(),
|
||||||
self.metadata.namespace.clone().unwrap(),
|
self.metadata.namespace.clone().unwrap(),
|
||||||
self.spec.inputs.clone(),
|
self.spec.inputs.clone(),
|
||||||
|
false,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
@ -470,8 +638,10 @@ impl ConfigSet {
|
|||||||
async fn cleanup(&self, ctx: Arc<Context>) -> Result<Action> {
|
async fn cleanup(&self, ctx: Arc<Context>) -> Result<Action> {
|
||||||
let inputs: HashMap<String, String> = gather_inputs(
|
let inputs: HashMap<String, String> = gather_inputs(
|
||||||
ctx.client.clone(),
|
ctx.client.clone(),
|
||||||
|
self.name_any(),
|
||||||
self.metadata.namespace.clone().unwrap(),
|
self.metadata.namespace.clone().unwrap(),
|
||||||
self.spec.inputs.clone(),
|
self.spec.inputs.clone(),
|
||||||
|
true,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
@ -79,6 +79,16 @@ fn prepare_cluster_role(namespace: String) -> ClusterRole {
|
|||||||
],
|
],
|
||||||
..Default::default()
|
..Default::default()
|
||||||
},
|
},
|
||||||
|
PolicyRule {
|
||||||
|
api_groups: Some(vec!["".to_string()]),
|
||||||
|
resources: Some(vec!["namespaces".to_string()]),
|
||||||
|
verbs: vec![
|
||||||
|
"get".to_string(),
|
||||||
|
"list".to_string(),
|
||||||
|
"watch".to_string()
|
||||||
|
],
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
];
|
];
|
||||||
|
|
||||||
ClusterRole {
|
ClusterRole {
|
||||||
|
@ -52,6 +52,6 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -39,8 +39,8 @@ spec:
|
|||||||
template: "{{ PROTO }}:{{ USERNAME }}:{{ PASSWORD }}/{{ DATABASE }}"
|
template: "{{ PROTO }}:{{ USERNAME }}:{{ PASSWORD }}/{{ DATABASE }}"
|
||||||
target: app-connection-string
|
target: app-connection-string
|
||||||
- name: EXISTING
|
- name: EXISTING
|
||||||
template: TEST
|
template: "{{ PROTO }}"
|
||||||
target: existing-target
|
target: app-connection-string
|
||||||
- name: IS_POSTGRES
|
- name: IS_POSTGRES
|
||||||
template: |
|
template: |
|
||||||
{{#if (eq PROTO "postgresql") }}
|
{{#if (eq PROTO "postgresql") }}
|
||||||
|
Reference in New Issue
Block a user