|
|
|
@ -2,19 +2,23 @@ use crate::api::v1alpha1::configsets_api::{
|
|
|
|
|
ConfigSet, Input, InputWithName, TargetWithName, Templates,
|
|
|
|
|
};
|
|
|
|
|
use core::fmt;
|
|
|
|
|
use futures::StreamExt;
|
|
|
|
|
use futures::{StreamExt, TryStreamExt, stream};
|
|
|
|
|
use handlebars::Handlebars;
|
|
|
|
|
use k8s_openapi::api::core::v1::{ConfigMap, Secret};
|
|
|
|
|
use k8s_openapi::api::core::v1::{ConfigMap, Namespace, Secret};
|
|
|
|
|
use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference;
|
|
|
|
|
use k8s_openapi::{ByteString, NamespaceResourceScope};
|
|
|
|
|
use kube::api::{ListParams, PostParams};
|
|
|
|
|
use kube::core::{Object, ObjectMeta};
|
|
|
|
|
use kube::api::{GroupVersionKind, ListParams, PostParams, WatchEvent};
|
|
|
|
|
use kube::core::{DynamicResourceScope, Object, ObjectMeta};
|
|
|
|
|
use kube::error::ErrorResponse;
|
|
|
|
|
use kube::runtime::controller::Action;
|
|
|
|
|
use kube::runtime::finalizer::Event as Finalizer;
|
|
|
|
|
use kube::runtime::watcher::Config;
|
|
|
|
|
use kube::runtime::{finalizer, Controller};
|
|
|
|
|
use kube::runtime::{
|
|
|
|
|
watcher::{self, Config},
|
|
|
|
|
WatchStreamExt,
|
|
|
|
|
};
|
|
|
|
|
use kube::{Api, Client, CustomResource};
|
|
|
|
|
use kube_client::api::WatchParams;
|
|
|
|
|
use kube_client::core::DynamicObject;
|
|
|
|
|
use kube_client::{Resource, ResourceExt};
|
|
|
|
|
use log::*;
|
|
|
|
@ -94,13 +98,18 @@ pub async fn setup() {
|
|
|
|
|
let client = Client::try_default()
|
|
|
|
|
.await
|
|
|
|
|
.expect("failed to create kube Client");
|
|
|
|
|
let docs = Api::<ConfigSet>::all(client.clone());
|
|
|
|
|
if let Err(e) = docs.list(&ListParams::default().limit(1)).await {
|
|
|
|
|
let confsets = Api::<ConfigSet>::all(client.clone());
|
|
|
|
|
if let Err(e) = confsets.list(&ListParams::default().limit(1)).await {
|
|
|
|
|
error!("{}", e);
|
|
|
|
|
std::process::exit(1);
|
|
|
|
|
}
|
|
|
|
|
let ctx = Arc::new(Context { client });
|
|
|
|
|
Controller::new(docs, Config::default().any_semantic())
|
|
|
|
|
|
|
|
|
|
tokio::spawn(setup_secret_watcher(client.clone()));
|
|
|
|
|
|
|
|
|
|
let ctx = Arc::new(Context {
|
|
|
|
|
client: client.clone(),
|
|
|
|
|
});
|
|
|
|
|
Controller::new(confsets, Config::default().any_semantic())
|
|
|
|
|
.shutdown_on_signal()
|
|
|
|
|
.run(reconcile, error_policy, ctx)
|
|
|
|
|
.filter_map(|x| async move { std::result::Result::ok(x) })
|
|
|
|
@ -108,6 +117,107 @@ pub async fn setup() {
|
|
|
|
|
.await;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn setup_secret_watcher(client: Client) {
|
|
|
|
|
let namespace_api: Api<Namespace> = Api::all(client.clone());
|
|
|
|
|
let stream = match namespace_api.watch(&WatchParams::default(), "0").await {
|
|
|
|
|
Ok(status) => {
|
|
|
|
|
let mut status_box = status.boxed();
|
|
|
|
|
while let Some(status) = status_box.try_next().await.unwrap() {
|
|
|
|
|
match status {
|
|
|
|
|
WatchEvent::Added(ns) => {
|
|
|
|
|
info!("namespace is being watched {}", ns.name_any());
|
|
|
|
|
let sec_api = get_secret_api(client.clone(), ns.name_any());
|
|
|
|
|
let cm_api = get_configmap_api(client.clone(), ns.name_any());
|
|
|
|
|
tokio::spawn( async move {
|
|
|
|
|
let stream = match sec_api.watch(&WatchParams::default(), "0").await {
|
|
|
|
|
Ok(status) => {
|
|
|
|
|
let mut status_box = status.boxed();
|
|
|
|
|
while let Some(status) = status_box.try_next().await.unwrap() {
|
|
|
|
|
match status {
|
|
|
|
|
WatchEvent::Modified(secret) => {
|
|
|
|
|
if is_shu_object(secret.metadata.clone()) {
|
|
|
|
|
let config_set_name = secret.metadata.clone().annotations.unwrap().get(WATCHED_BY_SHU).unwrap();
|
|
|
|
|
info!(
|
|
|
|
|
"secret {} was updated, triggering configset reconciliation",
|
|
|
|
|
secret.metadata.clone().name.unwrap(),
|
|
|
|
|
);
|
|
|
|
|
watcher_trigger_reconcile(secret.metadata.clone().annotations.unwrap().get(WATCHED_BY_SHU).unwrap().to_string(), secret.metadata.namespace.unwrap()).await;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
_ => {},
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Err(err) => {
|
|
|
|
|
error!("couldn't start secrets watcher");
|
|
|
|
|
},
|
|
|
|
|
};
|
|
|
|
|
});
|
|
|
|
|
tokio::spawn( async move {
|
|
|
|
|
let stream = match cm_api.watch(&WatchParams::default(), "0").await {
|
|
|
|
|
Ok(status) => {
|
|
|
|
|
let mut status_box = status.boxed();
|
|
|
|
|
while let Some(status) = status_box.try_next().await.unwrap() {
|
|
|
|
|
match status {
|
|
|
|
|
WatchEvent::Modified(configmap) => {
|
|
|
|
|
if is_shu_object(configmap.metadata.clone()) {
|
|
|
|
|
let config_set_name = configmap.metadata.clone().annotations.unwrap().get(WATCHED_BY_SHU).unwrap();
|
|
|
|
|
info!(
|
|
|
|
|
"configmap {} was updated, triggering configset reconciliation",
|
|
|
|
|
configmap.metadata.clone().name.unwrap(),
|
|
|
|
|
);
|
|
|
|
|
watcher_trigger_reconcile(configmap.metadata.clone().annotations.unwrap().get(WATCHED_BY_SHU).unwrap().to_string(), configmap.metadata.namespace.unwrap()).await;
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
_ => {},
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Err(err) => {
|
|
|
|
|
error!("couldn't start secrets watcher");
|
|
|
|
|
},
|
|
|
|
|
};
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
_ => {},
|
|
|
|
|
// WatchEvent::Deleted(_) => todo!(),
|
|
|
|
|
// WatchEvent::Bookmark(_) => todo!(),
|
|
|
|
|
// WatchEvent::Error(_) => todo!(),
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
},
|
|
|
|
|
Err(err) => {
|
|
|
|
|
error!("couldn't watch namespaces, watcher is not enabled");
|
|
|
|
|
},
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn is_shu_object(metadata: ObjectMeta) -> bool {
|
|
|
|
|
match metadata.annotations {
|
|
|
|
|
Some(annotations) => match annotations.get(WATCHED_BY_SHU) {
|
|
|
|
|
Some(_) => true,
|
|
|
|
|
None => false,
|
|
|
|
|
},
|
|
|
|
|
None => false,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn watcher_trigger_reconcile(config_set_name: String, namespace: String) {
|
|
|
|
|
let client = Client::try_default().await.expect("failed to create kube Client");
|
|
|
|
|
let api: Api<ConfigSet> = Api::namespaced(client.clone(), &namespace);
|
|
|
|
|
let config_set = match api.get(&config_set_name).await {
|
|
|
|
|
Ok(res) => {
|
|
|
|
|
if let Err(err) = reconcile(Arc::new(res), Arc::new(Context {client })).await {
|
|
|
|
|
error!("an error occured during reconciliation: {}", err);
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
Err(err) => {
|
|
|
|
|
error!("couldn't get configset: {}", err);
|
|
|
|
|
},
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn error_policy(doc: Arc<ConfigSet>, error: &Error, ctx: Arc<Context>) -> Action {
|
|
|
|
|
Action::requeue(Duration::from_secs(5 * 60))
|
|
|
|
|
}
|
|
|
|
@ -122,8 +232,10 @@ fn get_configmap_api(client: Client, namespace: String) -> Api<ConfigMap> {
|
|
|
|
|
|
|
|
|
|
async fn gather_inputs(
|
|
|
|
|
client: Client,
|
|
|
|
|
config_set_name: String,
|
|
|
|
|
namespace: String,
|
|
|
|
|
inputs: Vec<InputWithName>,
|
|
|
|
|
is_removed: bool,
|
|
|
|
|
) -> Result<HashMap<String, String>> {
|
|
|
|
|
let mut result: HashMap<String, String> = HashMap::new();
|
|
|
|
|
for i in inputs {
|
|
|
|
@ -148,8 +260,19 @@ async fn gather_inputs(
|
|
|
|
|
))))
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
// Take dynamic resource identifiers:
|
|
|
|
|
|
|
|
|
|
// Use the full resource info to create an Api with the ApiResource as its DynamicType
|
|
|
|
|
let api: Api<ConfigMap> = Api::namespaced(client.clone(), &namespace);
|
|
|
|
|
let a: Api<DynamicObject> = api.try_into()?;
|
|
|
|
|
|
|
|
|
|
match is_removed {
|
|
|
|
|
true => unset_annotations(, i.from.key, config_set_name).await?,
|
|
|
|
|
false => set_annotations(api, i.from.key, config_set_name).await?,
|
|
|
|
|
};
|
|
|
|
|
value.to_string()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Err(err) => {
|
|
|
|
|
error!("{err}");
|
|
|
|
|
return Err(Error::KubeError(err));
|
|
|
|
@ -187,6 +310,49 @@ async fn gather_inputs(
|
|
|
|
|
Ok(result)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn set_annotations(api: Api<DynamicObject>, object_name: String, config_set_name: String) -> Result<()> {
|
|
|
|
|
let object = match api.get_opt(&object_name).await {
|
|
|
|
|
Ok(object) => match object {
|
|
|
|
|
Some(object) => object,
|
|
|
|
|
None => return Ok(()),
|
|
|
|
|
},
|
|
|
|
|
Err(err) => return Err(Error::KubeError(err)),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let mut annotations = match object.clone().metadata.annotations {
|
|
|
|
|
Some(annotations) => annotations,
|
|
|
|
|
None => BTreeMap::new(),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
annotations.insert(WATCHED_BY_SHU.to_string(), config_set_name).unwrap();
|
|
|
|
|
if let Err(err) = api.replace(&object_name, &PostParams::default(), &object).await {
|
|
|
|
|
return Err(Error::KubeError(err));
|
|
|
|
|
}
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn unset_annotations(api: Api<DynamicObject>, object_name: String, config_set_name: String) -> Result<()> {
|
|
|
|
|
let object = match api.get_opt(&object_name).await {
|
|
|
|
|
Ok(object) => match object {
|
|
|
|
|
Some(object) => object,
|
|
|
|
|
None => return Ok(()),
|
|
|
|
|
},
|
|
|
|
|
Err(err) => return Err(Error::KubeError(err)),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let mut annotations = match object.clone().metadata.annotations {
|
|
|
|
|
Some(annotations) => annotations,
|
|
|
|
|
None => BTreeMap::new(),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
annotations.insert(WATCHED_BY_SHU.to_string(), config_set_name).unwrap();
|
|
|
|
|
if let Err(err) = api.replace(&object_name, &PostParams::default(), &object).await {
|
|
|
|
|
return Err(Error::KubeError(err));
|
|
|
|
|
}
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async fn gather_targets(
|
|
|
|
|
client: Client,
|
|
|
|
|
namespace: String,
|
|
|
|
@ -399,8 +565,10 @@ impl ConfigSet {
|
|
|
|
|
|
|
|
|
|
let inputs: HashMap<String, String> = gather_inputs(
|
|
|
|
|
ctx.client.clone(),
|
|
|
|
|
self.name_any(),
|
|
|
|
|
self.metadata.namespace.clone().unwrap(),
|
|
|
|
|
self.spec.inputs.clone(),
|
|
|
|
|
false,
|
|
|
|
|
)
|
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
|
@ -470,8 +638,10 @@ impl ConfigSet {
|
|
|
|
|
async fn cleanup(&self, ctx: Arc<Context>) -> Result<Action> {
|
|
|
|
|
let inputs: HashMap<String, String> = gather_inputs(
|
|
|
|
|
ctx.client.clone(),
|
|
|
|
|
self.name_any(),
|
|
|
|
|
self.metadata.namespace.clone().unwrap(),
|
|
|
|
|
self.spec.inputs.clone(),
|
|
|
|
|
true,
|
|
|
|
|
)
|
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
|
|