WIP: Adding first controller

Signed-off-by: Nikolai Rodionov <iam@allanger.xyz>
This commit is contained in:
2026-03-11 13:45:06 +01:00
parent ed3cada4df
commit 9f5d105f54
12 changed files with 587 additions and 53 deletions

View File

@@ -0,0 +1,10 @@
---
apiVersion: s3.badhouseplants.net/v1beta1
kind: S3Bucket
metadata:
name: test
namespace: default
spec:
instance: test
cleanup: false
ownConfigmap: false

View File

@@ -13,8 +13,9 @@ kind: S3Instance
metadata:
name: test
spec:
endpoint: rustfs.badhouseplants.net:443
region: us-east1
endpoint: https://rustfs.badhouseplants.net
forcePathStyle: true
region: us-east-1
credentialsSecret:
namespace: default
name: test

View File

@@ -1 +1,2 @@
pub mod s3_bucket;
pub mod s3_instance;

View File

@@ -0,0 +1,49 @@
use k8s_openapi::apimachinery::pkg::apis::meta::v1::Condition;
use k8s_openapi::serde::{Deserialize, Serialize};
use kube::CustomResource;
use kube::{self};
use schemars::JsonSchema;
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(
kind = "S3Bucket",
group = "s3.badhouseplants.net",
version = "v1beta1",
shortname = "bucket",
doc = "Manage buckets on the s3 instance",
namespaced,
status = "S3BucketStatus",
printcolumn = r#"{"name":"Instance","type":"string","description":"On which instance this bucket is created","jsonPath":".spec.instance"}"#,
printcolumn = r#"{"name":"Region","type":"string","description":"The region of the bucket","jsonPath":".status.region"}"#,
printcolumn = r#"{"name":"Total Objects","type":"number","description":"How many objects are there in the bucket","jsonPath":".status.total_objects"}"#,
printcolumn = r#"{"name":"Status","type":"boolean","description":"Is the S3Instance ready","jsonPath":".status.ready"}"#
)]
#[serde(rename_all = "camelCase")]
pub struct S3BucketSpec {
/// On which instance this bucket should be created
pub instance: String,
/// Should perform a cleanup on delete?
/// It will remove all objects from the bucket
#[serde(default)]
pub cleanup: bool,
/// Should set the owner reference on the CM
#[serde(default)]
pub own_configmap: bool,
}
/// The status object of `DbInstance`
#[derive(Deserialize, Serialize, Clone, Default, Debug, JsonSchema)]
pub struct S3BucketStatus {
/// Is this bucket ready.
#[serde(default)]
pub ready: bool,
pub conditions: Vec<Condition>,
#[serde(default)]
pub size: Option<u32>,
#[serde(default)]
pub objects_buckets: Option<u32>,
#[serde(default)]
pub endpoint: Option<String>,
#[serde(default)]
pub region: Option<String>,
}

View File

@@ -14,9 +14,9 @@ use schemars::JsonSchema;
status = "S3InstanceStatus",
printcolumn = r#"{"name":"Endpoint","type":"string","description":"The URL of the instance","jsonPath":".spec.endpoint"}"#,
printcolumn = r#"{"name":"Region","type":"string","description":"The region of the instance","jsonPath":".spec.region"}"#,
printcolumn = r#"{"name":"Force Path Style","type":"boolean","description":"Is forcing path style","jsonPath":".spec.forcePathStyle"}"#,
printcolumn = r#"{"name":"Status","type":"boolean","description":"Is the S3Instance ready","jsonPath":".status.ready"}"#,
printcolumn = r#"{"name":"Total Buckets","type":"number","description":"How many buckets are there on the instance","jsonPath":".status.total_buckets"}"#
printcolumn = r#"{"name":"Path Style","type":"boolean","description":"Is forcing path style","jsonPath":".spec.forcePathStyle"}"#,
printcolumn = r#"{"name":"Total Buckets","type":"number","description":"How many buckets are there on the instance","jsonPath":".status.total_buckets"}"#,
printcolumn = r#"{"name":"Status","type":"boolean","description":"Is the S3Instance ready","jsonPath":".status.ready"}"#
)]
#[serde(rename_all = "camelCase")]
pub struct S3InstanceSpec {

View File

@@ -1,19 +1,12 @@
mod conditions;
mod controllers;
mod s3;
use std::sync::Arc;
use self::controllers::s3_instance::{State, error_policy, reconcile, run};
use actix_web::web::Data;
use crate::controllers::{s3_bucket, s3_instance};
use actix_web::{App, HttpRequest, HttpResponse, HttpServer, Responder, get, middleware};
use api::api::v1beta1::s3_instance::S3Instance;
use clap::Parser;
use futures::StreamExt;
use kube::api::ListParams;
use kube::runtime::Controller;
use kube::runtime::events::{Recorder, Reporter};
use kube::runtime::watcher::Config;
use kube::{Api, Client};
use kube::Client;
use tracing_subscriber::EnvFilter;
/// Simple program to greet a person
@@ -42,18 +35,15 @@ async fn health(_: HttpRequest) -> impl Responder {
#[tokio::main]
async fn main() -> anyhow::Result<()> {
//
// Initiatilize Kubernetes controller state
//
tracing_subscriber::fmt()
.json()
.with_env_filter(EnvFilter::from_default_env())
.init();
let state = State::default();
let client = Client::try_default()
.await
.expect("failed to create kube Client");
let dbin_controller = run(client, state);
let s3in_controller = s3_instance::run(client.clone());
let s3bucket_controller = s3_bucket::run(client.clone());
// Start web server
let server = HttpServer::new(move || {
App::new()
@@ -64,6 +54,6 @@ async fn main() -> anyhow::Result<()> {
.shutdown_timeout(5);
// Both runtimes implements graceful shutdown, so poll until both are done
tokio::join!(dbin_controller, server.run()).1?;
tokio::join!(s3in_controller, s3bucket_controller, server.run()).2?;
Ok(())
}

View File

@@ -1 +1,2 @@
pub(crate) mod s3_bucket;
pub(crate) mod s3_instance;

View File

@@ -0,0 +1,422 @@
use crate::conditions::set_condition;
use crate::controllers::s3_instance;
use crate::s3::S3Client;
use crate::s3::s3::S3Api;
use api::api::v1beta1::s3_bucket::{S3Bucket, S3BucketStatus};
use api::api::v1beta1::s3_instance::S3Instance;
use futures::StreamExt;
use k8s_openapi::api::core::v1::{ConfigMap, Secret};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference;
use kube::api::{ListParams, ObjectMeta, PostParams};
use kube::runtime::Controller;
use kube::runtime::controller::Action;
use kube::runtime::events::Recorder;
use kube::runtime::watcher::Config;
use kube::{Api, Client, Error, Resource, ResourceExt};
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tracing::*;
const TYPE_INSTANCE_CONNECTED: &str = "InstanceConnected";
const TYPE_CONFIGMAP_READY: &str = "ConfigMapReady";
const TYPE_BUCKET_READY: &str = "BucketReady";
const FIN_CLEANUP: &str = "s3.badhouseplants.net/bucket-cleanup";
const CONFIGMAP_LABEL: &str = "s3.badhouseplants.net/s3-bucket";
const AWS_REGION: &str = "AWS_REGION";
const AWS_ENDPOINT_URL: &str = "AWS_ENDPOINT_URL";
#[instrument(skip(ctx, obj), fields(trace_id))]
pub(crate) async fn reconcile(obj: Arc<S3Bucket>, ctx: Arc<Context>) -> S3BucketResult<Action> {
info!("Staring reconciling");
let s3bucket_api: Api<S3Bucket> = Api::namespaced(ctx.client.clone(), &obj.namespace().unwrap());
let cm_api: Api<ConfigMap> = Api::namespaced(ctx.client.clone(), &obj.namespace().unwrap());
let s3in_api: Api<S3Instance> = Api::all(ctx.client.clone());
info!("Getting the S3Bucket resource");
let mut s3bucket = match s3bucket_api.get(&obj.name_any()).await {
Ok(s3bucket) => s3bucket,
Err(Error::Api(ae)) if ae.code == 404 => {
info!("Object is not found, probably removed");
return Ok(Action::await_change());
}
Err(err) => {
error!("{}", err);
return Err(S3BucketError::KubeError(err));
}
};
// On the first reconciliation status is None
// it needs to be initialized
let mut status = match s3bucket.clone().status {
None => {
info!("Status is not yet set, initializing the object");
return init_object(s3bucket, s3bucket_api).await;
}
Some(status) => status,
};
let configmap_name = format!("{}-bucket-info", s3bucket.name_any());
info!("Getting the configmap");
// Get the cm, if it's already there, we need to validate, or create an empty one
let mut configmap = match get_configmap(cm_api.clone(), &configmap_name).await {
Ok(configmap) => configmap,
Err(Error::Api(ae)) if ae.code == 404 => {
info!("ConfigMap is not found, creating a new one");
let cm = ConfigMap{
metadata: ObjectMeta {
name: Some(configmap_name),
namespace: Some(s3bucket.clone().namespace().unwrap()),
..Default::default()
},
..Default::default()
};
match create_configmap(cm_api.clone(), cm).await {
Ok(cm) => cm,
Err(err) => {
error!("{}", err);
return Err(S3BucketError::KubeError(err));
},
}
}
Err(err) => {
error!("{}", err);
return Err(S3BucketError::KubeError(err));
},
};
info!("Labeling the configmap");
configmap = match label_configmap(cm_api.clone(), &s3bucket.name_any(), configmap).await {
Ok(configmap) => configmap,
Err(err) => {
error!("{}", err);
return Err(S3BucketError::KubeError(err));
},
};
info!("Setting owner references to the configmap");
if s3bucket.spec.own_configmap {
configmap = match own_configmap(cm_api.clone(), s3bucket.clone(), configmap).await {
Ok(configmap) => configmap,
Err(err) => {
error!("{}", err);
return Err(S3BucketError::KubeError(err));
},
};
};
info!("Getting the S3Intsance");
let s3in = match s3in_api.get(&s3bucket.spec.instance).await {
Ok(s3in) => s3in,
Err(err) => {
error!("{}", err);
return Err(S3BucketError::KubeError(err));
}
};
info!("Updating the ConfigMap");
if let Err(err) = ensure_data_configmap(cm_api.clone(), s3in.clone(), configmap.clone()).await {
error!("{}", err);
return Err(S3BucketError::KubeError(err));
};
info!("Getting the s3instance secret");
let secret_ns = s3in.clone().spec.credentials_secret.namespace;
let secret_api: Api<Secret> = Api::namespaced(ctx.client.clone(), &secret_ns);
let secret = match s3_instance::get_secret(secret_api.clone(), s3in.clone()).await {
Ok(secret) => secret,
Err(err) => {
error!("{}", err);
return Err(S3BucketError::KubeError(err));
}
};
info!("Getting data from the secret");
// Getting data from the secret to initialize the clinet
let data = match secret.data {
Some(data) => data,
None => {
let err = anyhow::Error::msg("empty data");
error!("{}", err);
return Err(S3BucketError::InvalidSecret(err));
}
};
let access_key = match data.get(s3_instance::ACCESS_KEY) {
Some(access_key) => String::from_utf8(access_key.0.clone()).unwrap(),
None => {
let err = anyhow::Error::msg("empty access key");
error!("{}", err);
return Err(S3BucketError::InvalidSecret(err));
}
};
let secret_key = match data.get(s3_instance::SECRET_KEY) {
Some(secret_key) => String::from_utf8(secret_key.0.clone()).unwrap(),
None => {
let err = anyhow::Error::msg("empty secret key");
error!("{}", err);
return Err(S3BucketError::InvalidSecret(err));
}
};
info!("Creating an s3 client");
let s3_client = S3Api::new(
access_key,
secret_key,
s3in.clone().spec.endpoint.to_string(),
s3in.clone().spec.region.to_string(),
s3in.clone().spec.force_path_style,
)
.await;
info!("Getting buckets");
let buckets = match s3_client.clone().list_buckets().await {
Ok(buckets) => buckets,
Err(err) => {
error!("{}", err);
return Err(S3BucketError::IllegalS3Bucket);
}
};
let bucket_name = format!("{}-{}", s3bucket.namespace().unwrap(), s3bucket.name_any());
if buckets.contains(&bucket_name) {
info!("Bucket already exists");
return Ok(Action::await_change());
}
if let Err(err) = s3_client.create_buckets(bucket_name).await {
error!("{}", err);
return Err(S3BucketError::IllegalS3Bucket);
}
status.ready = true;
status.objects_buckets = None;
status.endpoint = Some(s3in.clone().spec.endpoint);
status.size = None;
status.region = Some(s3in.spec.region);
s3bucket.status = Some(status);
info!("Updating status of the s3bucket resource");
match s3bucket_api
.replace_status(&s3bucket.name_any(), &PostParams::default(), &s3bucket)
.await
{
Ok(_) => {
return Ok(Action::requeue(Duration::from_secs(120)));
}
Err(err) => {
error!("{}", err);
return Err(S3BucketError::KubeError(err));
}
};
}
// Bootstrap the object by adding a default status to it
async fn init_object(mut obj: S3Bucket, api: Api<S3Bucket>) -> Result<Action, S3BucketError> {
let mut conditions = set_condition(
vec![],
obj.metadata.clone(),
TYPE_INSTANCE_CONNECTED,
"Unknown".to_string(),
"Reconciling".to_string(),
"Reconciliation started".to_string(),
);
conditions = set_condition(
conditions,
obj.metadata.clone(),
TYPE_BUCKET_READY,
"Unknown".to_string(),
"Reconciling".to_string(),
"Reconciliation started".to_string(),
);
conditions = set_condition(
conditions,
obj.metadata.clone(),
TYPE_CONFIGMAP_READY,
"Unknown".to_string(),
"Reconciling".to_string(),
"Reconciliation started".to_string(),
);
obj.status = Some(S3BucketStatus {
conditions,
..S3BucketStatus::default()
});
match api
.replace_status(obj.clone().name_any().as_str(), &Default::default(), &obj)
.await
{
Ok(_) => Ok(Action::await_change()),
Err(err) => {
error!("{}", err);
Err(S3BucketError::KubeError(err))
}
}
}
// Get the configmap with the bucket data
async fn get_configmap(api: Api<ConfigMap>, name: &str) -> Result<ConfigMap, kube::Error> {
info!("Getting a configmap: {}", name);
match api.get(name).await {
Ok(cm) => Ok(cm),
Err(err) => Err(err),
}
}
// Create ConfigMap
async fn create_configmap(api: Api<ConfigMap>, cm: ConfigMap) -> Result<ConfigMap, kube::Error> {
match api.create(&PostParams::default(), &cm).await {
Ok(cm) => get_configmap(api, &cm.name_any()).await,
Err(err) => Err(err),
}
}
async fn label_configmap(
api: Api<ConfigMap>,
s3bucket_name: &str,
mut cm: ConfigMap,
) -> Result<ConfigMap, kube::Error> {
let mut labels = match &cm.clone().metadata.labels {
Some(labels) => labels.clone(),
None => {
let map: BTreeMap<String, String> = BTreeMap::new();
map
}
};
labels.insert(CONFIGMAP_LABEL.to_string(), s3bucket_name.to_string());
cm.metadata.labels = Some(labels);
api.replace(&cm.name_any(), &PostParams::default(), &cm)
.await?;
let cm = match api.get(&cm.name_any()).await {
Ok(cm) => cm,
Err(err) => {
return Err(err);
}
};
Ok(cm)
}
async fn own_configmap(
api: Api<ConfigMap>,
s3bucket: S3Bucket,
mut cm: ConfigMap,
) -> Result<ConfigMap, kube::Error> {
let mut owner_references = match &cm.clone().metadata.owner_references {
Some(owner_references) => owner_references.clone(),
None => {
let owner_references: Vec<OwnerReference> = vec![];
owner_references
}
};
if owner_references.iter().find(|or| or.uid == s3bucket.uid().unwrap()).is_some() {
return Ok(cm);
}
let new_owner_reference = OwnerReference{
api_version: S3Bucket::api_version(&()).into(),
kind: S3Bucket::kind(&()).into(),
name: s3bucket.name_any(),
uid: s3bucket.uid().unwrap(),
..Default::default()
};
owner_references.push(new_owner_reference);
cm.metadata.owner_references = Some(owner_references);
api.replace(&cm.name_any(), &PostParams::default(), &cm)
.await?;
let cm = match api.get(&cm.name_any()).await {
Ok(cm) => cm,
Err(err) => {
return Err(err);
}
};
Ok(cm)
}
async fn ensure_data_configmap(
api: Api<ConfigMap>,
s3in: S3Instance,
mut cm: ConfigMap,
) -> Result<ConfigMap, kube::Error> {
let mut data = match &cm.clone().data {
Some(data) => data.clone(),
None => {
let map: BTreeMap<String, String> = BTreeMap::new();
map
}
};
data.insert(AWS_REGION.to_string(), s3in.spec.region);
data.insert(AWS_ENDPOINT_URL.to_string(), s3in.spec.endpoint);
cm.data = Some(data);
api.replace(&cm.name_any(), &PostParams::default(), &cm)
.await?;
match api.get(&cm.name_any()).await {
Ok(cm) => Ok(cm),
Err(err) => Err(err),
}
}
pub(crate) fn error_policy(_: Arc<S3Bucket>, _: &S3BucketError, _: Arc<Context>) -> Action {
Action::requeue(Duration::from_secs(5 * 60))
}
#[instrument(skip(client), fields(trace_id))]
pub async fn run(client: Client) {
let s3buckets = Api::<S3Bucket>::all(client.clone());
if let Err(err) = s3buckets.list(&ListParams::default().limit(1)).await {
error!("{}", err);
std::process::exit(1);
}
let recorder = Recorder::new(client.clone(), "s3bucket-controller".into());
let context = Context { client, recorder };
Controller::new(s3buckets, Config::default().any_semantic())
.shutdown_on_signal()
.run(reconcile, error_policy, Arc::new(context))
.filter_map(|x| async move { std::result::Result::ok(x) })
.for_each(|_| futures::future::ready(()))
.await;
}
// Context for our reconciler
#[derive(Clone)]
pub(crate) struct Context {
/// Kubernetes client
pub client: Client,
/// Event recorder
pub recorder: Recorder,
}
#[derive(Error, Debug)]
pub enum S3BucketError {
#[error("SerializationError: {0}")]
SerializationError(#[source] serde_json::Error),
#[error("Kube Error: {0}")]
KubeError(#[source] kube::Error),
#[error("Finalizer Error: {0}")]
// NB: awkward type because finalizer::Error embeds the reconciler error (which is this)
// so boxing this error to break cycles
FinalizerError(#[source] Box<kube::runtime::finalizer::Error<S3BucketError>>),
#[error("IllegalS3Bucket")]
IllegalS3Bucket,
#[error("SecretIsAlreadyLabeled")]
SecretIsAlreadyLabeled,
#[error("Invalid Secret: {0}")]
InvalidSecret(#[source] anyhow::Error),
}
pub type S3BucketResult<T, E = S3BucketError> = std::result::Result<T, E>;

View File

@@ -1,4 +1,4 @@
use crate::conditions::{is_condition_unknown, set_condition};
use crate::conditions::{is_condition_true, is_condition_unknown, set_condition};
use crate::s3::S3Client;
use crate::s3::s3::S3Api;
use api::api::v1beta1::s3_instance::{S3Instance, S3InstanceStatus};
@@ -18,13 +18,15 @@ use tracing::*;
const TYPE_CONNECTED: &str = "Connected";
const TYPE_SECRET_LABELED: &str = "SecretLabeled";
const FIN_SECRET_LABEL: &str = "s3.badhouseplants.net/s3-label";
const SECRET_LABEL: &str = "s3.badhouseplants.net/s3-instance";
const ACCESS_KEY: &str = "ACCESS_KEY";
const SECRET_KEY: &str = "SECRET_KEY";
pub(crate) const ACCESS_KEY: &str = "ACCESS_KEY";
pub(crate) const SECRET_KEY: &str = "SECRET_KEY";
#[instrument(skip(ctx, obj), fields(trace_id))]
pub(crate) async fn reconcile(obj: Arc<S3Instance>, ctx: Arc<Context>) -> S3InstanceResult<Action> {
info!("Staring reconciling");
info!("Getting the S3Instance resource");
let s3_api: Api<S3Instance> = Api::all(ctx.client.clone());
let mut s3in = match s3_api.get(obj.name_any().as_str()).await {
Ok(res) => res,
@@ -38,10 +40,8 @@ pub(crate) async fn reconcile(obj: Arc<S3Instance>, ctx: Arc<Context>) -> S3Inst
}
};
if s3in.metadata.deletion_timestamp.is_some() {
info!("Object is marked for deletion");
return Ok(Action::await_change());
}
let secret_ns = s3in.clone().spec.credentials_secret.namespace;
let secret_api: Api<Secret> = Api::namespaced(ctx.client.clone(), &secret_ns);
let mut status = match s3in.clone().status {
None => {
@@ -51,7 +51,7 @@ pub(crate) async fn reconcile(obj: Arc<S3Instance>, ctx: Arc<Context>) -> S3Inst
Some(status) => status,
};
let secret = match get_secret(ctx.clone(), s3in.clone()).await {
let secret = match get_secret(secret_api.clone(), s3in.clone()).await {
Ok(secret) => secret,
Err(err) => {
error!("{}", err);
@@ -71,7 +71,58 @@ pub(crate) async fn reconcile(obj: Arc<S3Instance>, ctx: Arc<Context>) -> S3Inst
return Err(S3InstanceError::KubeError(err));
}
};
if s3in.metadata.deletion_timestamp.is_some() {
info!("Object is marked for deletion");
if let Some(mut finalizers) = s3in.clone().metadata.finalizers {
if finalizers.contains(&FIN_SECRET_LABEL.to_string()) {
match unlabel_secret(ctx.clone(), s3in.clone(), secret).await {
Ok(_) => {
if let Some(index) = finalizers.iter().position(|x| *x == FIN_SECRET_LABEL.to_string()) {
finalizers.remove(index);
};
},
Err(err) => {
error!("{}", err);
return Err(S3InstanceError::KubeError(err));
},
};
}
s3in.metadata.finalizers = Some(finalizers);
};
match s3_api.replace(&s3in.name_any(), &PostParams::default(), &s3in).await {
Ok(_) => {
return Ok(Action::await_change());
},
Err(err) => {
error!("{}", err);
return Err(S3InstanceError::KubeError(err))
},
}
}
if is_condition_true(status.clone().conditions, TYPE_SECRET_LABELED) {
let mut current_finalizers = match s3in.clone().metadata.finalizers {
Some(finalizers) => finalizers,
None => vec![],
};
if !current_finalizers.contains(&FIN_SECRET_LABEL.to_string()) {
info!("Adding a finalizer");
current_finalizers.push(FIN_SECRET_LABEL.to_string());
s3in.metadata.finalizers = Some(current_finalizers);
match s3_api.replace(&s3in.name_any(), &PostParams::default(), &s3in).await {
Ok(_) => {
return Ok(Action::await_change());
},
Err(err) => {
error!("{}", err);
return Err(S3InstanceError::KubeError(err))
},
}
}
}
if is_condition_unknown(status.clone().conditions, TYPE_SECRET_LABELED) {
if is_secret_labeled(secret.clone()) {
if is_secret_labeled_by_another_obj(s3in.clone(), secret.clone()) {
@@ -88,7 +139,6 @@ pub(crate) async fn reconcile(obj: Arc<S3Instance>, ctx: Arc<Context>) -> S3Inst
"Reconciled".to_string(),
"Secret is already labeled".to_string(),
);
s3in.status = Some(status.clone());
}
} else {
info!("Labeling the secret");
@@ -105,6 +155,7 @@ pub(crate) async fn reconcile(obj: Arc<S3Instance>, ctx: Arc<Context>) -> S3Inst
"Secret is labeled".to_string(),
);
};
s3in.status = Some(status);
match s3_api
.replace_status(&s3in.name_any(), &PostParams::default(), &s3in)
.await {
@@ -190,7 +241,6 @@ pub(crate) async fn reconcile(obj: Arc<S3Instance>, ctx: Arc<Context>) -> S3Inst
}
};
info!("{:?}", buckets);
status.ready = true;
status.buckets = Some(buckets.clone());
status.total_buckets = Some(buckets.len());
@@ -251,9 +301,7 @@ async fn init_object(mut obj: S3Instance, api: Api<S3Instance>) -> Result<Action
}
// Get the secret with credentials
async fn get_secret(ctx: Arc<Context>, obj: S3Instance) -> Result<Secret, kube::Error> {
let secret_ns = obj.clone().spec.credentials_secret.namespace;
let api: Api<Secret> = Api::namespaced(ctx.client.clone(), &secret_ns);
pub(crate) async fn get_secret(api: Api<Secret>, obj: S3Instance) -> Result<Secret, kube::Error> {
let secret = match api.get(&obj.spec.credentials_secret.name).await {
Ok(secret) => secret,
@@ -265,6 +313,21 @@ async fn get_secret(ctx: Arc<Context>, obj: S3Instance) -> Result<Secret, kube::
Ok(secret)
}
async fn unlabel_secret(
ctx: Arc<Context>,
obj: S3Instance,
mut secret: Secret,
) -> Result<(), kube::Error> {
let secret_ns = obj.clone().spec.credentials_secret.namespace;
let api: Api<Secret> = Api::namespaced(ctx.client.clone(), &secret_ns);
if let Some(mut labels) = secret.clone().metadata.labels {
labels.remove(&SECRET_LABEL.to_string());
secret.metadata.labels = Some(labels);
api.replace(&secret.name_any(), &PostParams::default(), &secret)
.await?;
}
Ok(())
}
async fn label_secret(
ctx: Arc<Context>,
obj: S3Instance,
@@ -333,32 +396,23 @@ pub(crate) fn error_policy(_: Arc<S3Instance>, _: &S3InstanceError, _: Arc<Conte
Action::requeue(Duration::from_secs(5 * 60))
}
#[instrument(skip(client, state), fields(trace_id))]
pub async fn run(client: Client, state: State) {
#[instrument(skip(client), fields(trace_id))]
pub async fn run(client: Client) {
let s3instances = Api::<S3Instance>::all(client.clone());
if let Err(err) = s3instances.list(&ListParams::default().limit(1)).await {
error!("{}", err);
std::process::exit(1);
}
let recorder = Recorder::new(client.clone(), "s3instance-controller".into());
let context = Context{ client, recorder };
Controller::new(s3instances, Config::default().any_semantic())
.shutdown_on_signal()
.run(reconcile, error_policy, state.to_context(client).await)
.run(reconcile, error_policy, Arc::new(context))
.filter_map(|x| async move { std::result::Result::ok(x) })
.for_each(|_| futures::future::ready(()))
.await;
}
/// State shared between the controller and the web server
#[derive(Clone, Default)]
pub(crate) struct State {}
impl State {
pub async fn to_context(&self, client: Client) -> Arc<Context> {
Arc::new(Context {
client: client.clone(),
recorder: Recorder::new(client, "s3instance-controller".into()),
})
}
}
// Context for our reconciler
#[derive(Clone)]
pub(crate) struct Context {

View File

@@ -1,8 +1,9 @@
use api::api::v1beta1::s3_instance::S3Instance;
use api::api::v1beta1::{s3_bucket::S3Bucket, s3_instance::S3Instance};
use kube::CustomResourceExt;
fn main() {
println!(
"---\n{}",
serde_yaml::to_string(&S3Instance::crd()).unwrap()
);
println!("---\n{}", serde_yaml::to_string(&S3Bucket::crd()).unwrap());
}

View File

@@ -5,4 +5,5 @@ pub(crate) mod s3;
pub(crate) trait S3Client {
async fn list_buckets(self) -> Result<Vec<String>, Error>;
async fn create_buckets(self, bucket_name: String ) -> Result<(), Error>;
}

View File

@@ -2,10 +2,10 @@ use aws_config::{BehaviorVersion, Region};
use aws_credential_types::Credentials;
use aws_sdk_s3::config::Builder;
use aws_sdk_s3::{Client, config::SharedCredentialsProvider};
use tracing::*;
use crate::s3::S3Client;
#[derive(Clone)]
pub(crate) struct S3Api {
client: aws_sdk_s3::Client,
}
@@ -18,8 +18,6 @@ impl S3Api {
region: String,
force_path_style: bool,
) -> Self {
info!(access_key);
info!(secret_key);
let creds = Credentials::new(access_key, secret_key, None, None, "static");
let config = aws_config::defaults(BehaviorVersion::latest())
.credentials_provider(SharedCredentialsProvider::new(creds))
@@ -51,7 +49,6 @@ impl S3Client for S3Api {
})
},
Err(err) => {
error!{"{}", err};
return Err(err.into());
},
};
@@ -62,4 +59,11 @@ impl S3Client for S3Api {
};
Ok(result)
}
async fn create_buckets(self, bucket_name: String) -> Result<(), anyhow::Error> {
match self.client.create_bucket().bucket(bucket_name).send().await {
Ok(_) => Ok(()),
Err(err) => Err(err.into()),
}
}
}