Init Project #1
1
operator/.gitignore
vendored
Normal file
1
operator/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
/target
|
||||
5228
operator/Cargo.lock
generated
Normal file
5228
operator/Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
63
operator/Cargo.toml
Normal file
63
operator/Cargo.toml
Normal file
@@ -0,0 +1,63 @@
|
||||
[package]
|
||||
name = "s3-operator"
|
||||
version = "0.1.0"
|
||||
authors = ["allanger <iam@allanger.xyz>"]
|
||||
edition = "2024"
|
||||
default-run = "controller"
|
||||
license = "GPLv3"
|
||||
publish = false
|
||||
|
||||
[[bin]]
|
||||
doc = false
|
||||
name = "controller"
|
||||
path = "src/controller.rs"
|
||||
|
||||
[[bin]]
|
||||
doc = false
|
||||
name = "crdgen"
|
||||
path = "src/crdgen.rs"
|
||||
|
||||
[lib]
|
||||
name = "api"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[dependencies]
|
||||
kube = { version = "3.0.1", features = ["runtime", "derive", "client", "aws-lc-rs"] }
|
||||
k8s-openapi = { version = "0.27.0", features = ["latest", "schemars"] }
|
||||
schemars = { version = "1" }
|
||||
darling = "0.23.0"
|
||||
clap = { version = "4.5.60", features = ["derive"] }
|
||||
serde = { version = "1.0.228", features = ["serde_derive"] }
|
||||
serde_json = "1.0.149"
|
||||
serde_yaml = "0.9.34"
|
||||
thiserror = "2.0.18"
|
||||
tracing = "0.1.44"
|
||||
tokio = { version = "1.49.0", features = ["macros", "rt-multi-thread"] }
|
||||
anyhow = "1.0.102"
|
||||
futures = "0.3.32"
|
||||
actix-web = "4.13.0"
|
||||
tracing-subscriber = { version = "0.3.22", features = ["json", "env-filter"] }
|
||||
aws-config = { version = "1.8.15", features = ["behavior-version-latest", "rustls"] }
|
||||
aws-sdk-s3 = "1.125.0"
|
||||
aws-credential-types = "1.2.14"
|
||||
minio = "0.3.0"
|
||||
reqwest = { version = "0.13.2", features = ["json"] }
|
||||
base64 = "0.22.1"
|
||||
rand = "0.10.0"
|
||||
aws-sigv4 = { version = "1.4.2", features = ["sigv4a"] }
|
||||
http = "1"
|
||||
aws-sig-auth = "0.60.3"
|
||||
aws-smithy-http = "0.63.6"
|
||||
aws-types = "1.3.14"
|
||||
hmac = "0.12.1"
|
||||
sha2 = "0.10.9"
|
||||
hex = "0.4.3"
|
||||
async-trait = "0.1.89"
|
||||
|
||||
[dev-dependencies]
|
||||
assert-json-diff = "2.0.2"
|
||||
envtest = "0.1.2"
|
||||
http = "1"
|
||||
hyper = "1"
|
||||
tower-test = "0.4.0"
|
||||
|
||||
10
operator/manifests/s3_bucket.yaml
Normal file
10
operator/manifests/s3_bucket.yaml
Normal file
@@ -0,0 +1,10 @@
|
||||
---
|
||||
apiVersion: s3.badhouseplants.net/v1beta1
|
||||
kind: S3Bucket
|
||||
metadata:
|
||||
name: test
|
||||
namespace: default
|
||||
spec:
|
||||
instance: test
|
||||
cleanup: true
|
||||
ownConfigmap: false
|
||||
9
operator/manifests/s3_bucket_user.yaml
Normal file
9
operator/manifests/s3_bucket_user.yaml
Normal file
@@ -0,0 +1,9 @@
|
||||
---
|
||||
apiVersion: s3.badhouseplants.net/v1beta1
|
||||
kind: S3BucketUser
|
||||
metadata:
|
||||
name: test
|
||||
namespace: default
|
||||
spec:
|
||||
bucket: test
|
||||
policy: readWrite
|
||||
22
operator/manifests/s3_instance.yaml
Normal file
22
operator/manifests/s3_instance.yaml
Normal file
@@ -0,0 +1,22 @@
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: Secret
|
||||
metadata:
|
||||
name: test
|
||||
namespace: default
|
||||
stringData:
|
||||
ACCESS_KEY: overlord
|
||||
SECRET_KEY: 's1cdlej3#^&LQetsQ8GUYix7ypLf#$#$wsdf'
|
||||
---
|
||||
apiVersion: s3.badhouseplants.net/v1beta1
|
||||
kind: S3Instance
|
||||
metadata:
|
||||
name: test
|
||||
spec:
|
||||
endpoint: https://rustfs.badhouseplants.net
|
||||
forcePathStyle: true
|
||||
region: us-east-1
|
||||
provider: rustfs
|
||||
credentialsSecret:
|
||||
namespace: default
|
||||
name: test
|
||||
1
operator/src/api/mod.rs
Normal file
1
operator/src/api/mod.rs
Normal file
@@ -0,0 +1 @@
|
||||
pub mod v1beta1;
|
||||
3
operator/src/api/v1beta1/mod.rs
Normal file
3
operator/src/api/v1beta1/mod.rs
Normal file
@@ -0,0 +1,3 @@
|
||||
pub mod s3_bucket;
|
||||
pub mod s3_bucket_user;
|
||||
pub mod s3_instance;
|
||||
53
operator/src/api/v1beta1/s3_bucket.rs
Normal file
53
operator/src/api/v1beta1/s3_bucket.rs
Normal file
@@ -0,0 +1,53 @@
|
||||
use k8s_openapi::apimachinery::pkg::apis::meta::v1::Condition;
|
||||
use k8s_openapi::serde::{Deserialize, Serialize};
|
||||
use kube::CustomResource;
|
||||
use kube::{self};
|
||||
use schemars::JsonSchema;
|
||||
|
||||
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
|
||||
#[kube(
|
||||
kind = "S3Bucket",
|
||||
group = "s3.badhouseplants.net",
|
||||
version = "v1beta1",
|
||||
shortname = "bucket",
|
||||
doc = "Manage buckets on the s3 instance",
|
||||
namespaced,
|
||||
status = "S3BucketStatus",
|
||||
printcolumn = r#"{"name":"Instance","type":"string","description":"On which instance this bucket is created","jsonPath":".spec.instance"}"#,
|
||||
printcolumn = r#"{"name":"Region","type":"string","description":"The region of the bucket","jsonPath":".status.region"}"#,
|
||||
printcolumn = r#"{"name":"Bucket Name","type":"string","description":"The full name of the bucket","jsonPath":".status.bucketName"}"#,
|
||||
printcolumn = r#"{"name":"Total Objects","type":"number","description":"How many objects are there in the bucket","jsonPath":".status.totalObjects"}"#,
|
||||
printcolumn = r#"{"name":"Status","type":"boolean","description":"Is the S3Instance ready","jsonPath":".status.ready"}"#
|
||||
)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct S3BucketSpec {
|
||||
/// On which instance this bucket should be created
|
||||
pub instance: String,
|
||||
/// Should perform a cleanup on delete?
|
||||
/// It will remove all objects from the bucket
|
||||
#[serde(default)]
|
||||
pub cleanup: bool,
|
||||
/// Should set the owner reference on the CM
|
||||
#[serde(default)]
|
||||
pub own_configmap: bool,
|
||||
}
|
||||
|
||||
/// The status object of `DbInstance`
|
||||
#[derive(Deserialize, Serialize, Clone, Default, Debug, JsonSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct S3BucketStatus {
|
||||
/// Is this bucket ready.
|
||||
#[serde(default)]
|
||||
pub ready: bool,
|
||||
pub conditions: Vec<Condition>,
|
||||
#[serde(default)]
|
||||
pub size: Option<u64>,
|
||||
#[serde(default)]
|
||||
pub total_objects: Option<u64>,
|
||||
#[serde(default)]
|
||||
pub endpoint: Option<String>,
|
||||
#[serde(default)]
|
||||
pub region: Option<String>,
|
||||
#[serde(default)]
|
||||
pub bucket_name: Option<String>,
|
||||
}
|
||||
40
operator/src/api/v1beta1/s3_bucket_user.rs
Normal file
40
operator/src/api/v1beta1/s3_bucket_user.rs
Normal file
@@ -0,0 +1,40 @@
|
||||
use k8s_openapi::apimachinery::pkg::apis::meta::v1::Condition;
|
||||
use k8s_openapi::serde::{Deserialize, Serialize};
|
||||
use kube::CustomResource;
|
||||
use kube::{self};
|
||||
use schemars::JsonSchema;
|
||||
|
||||
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
|
||||
#[kube(
|
||||
kind = "S3BucketUser",
|
||||
group = "s3.badhouseplants.net",
|
||||
version = "v1beta1",
|
||||
shortname = "s3bu",
|
||||
doc = "Manage users that have access to s3 buckets",
|
||||
namespaced,
|
||||
status = "S3BucketUserStatus",
|
||||
printcolumn = r#"{"name":"Status","type":"boolean","description":"Is the S3Instance ready","jsonPath":".status.ready"}"#
|
||||
)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct S3BucketUserSpec {
|
||||
/// To which bucket access should be provided
|
||||
pub bucket: String,
|
||||
pub policy: String,
|
||||
/// Should perform a cleanup on delete?
|
||||
#[serde(default)]
|
||||
pub cleanup: bool,
|
||||
/// Should set the owner reference on the Secret
|
||||
#[serde(default)]
|
||||
pub own_secret: bool,
|
||||
}
|
||||
|
||||
/// The status object of `DbInstance`
|
||||
#[derive(Deserialize, Serialize, Clone, Default, Debug, JsonSchema)]
|
||||
pub struct S3BucketUserStatus {
|
||||
/// Is this bucket ready.
|
||||
#[serde(default)]
|
||||
pub ready: bool,
|
||||
pub conditions: Vec<Condition>,
|
||||
#[serde(default)]
|
||||
pub access_key: Option<String>,
|
||||
}
|
||||
57
operator/src/api/v1beta1/s3_instance.rs
Normal file
57
operator/src/api/v1beta1/s3_instance.rs
Normal file
@@ -0,0 +1,57 @@
|
||||
use k8s_openapi::apimachinery::pkg::apis::meta::v1::Condition;
|
||||
use k8s_openapi::serde::{Deserialize, Serialize};
|
||||
use kube::CustomResource;
|
||||
use kube::{self};
|
||||
use schemars::JsonSchema;
|
||||
|
||||
#[derive(Serialize, Deserialize, JsonSchema, Clone, Debug)]
|
||||
pub enum Provider {
|
||||
#[serde(rename = "minio")]
|
||||
Minio,
|
||||
#[serde(rename = "rustfs")]
|
||||
Rustfs,
|
||||
}
|
||||
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
|
||||
#[kube(
|
||||
kind = "S3Instance",
|
||||
group = "s3.badhouseplants.net",
|
||||
version = "v1beta1",
|
||||
shortname = "s3in",
|
||||
doc = "Connect the operator to any s3 backend using this resource",
|
||||
status = "S3InstanceStatus",
|
||||
printcolumn = r#"{"name":"Endpoint","type":"string","description":"The URL of the instance","jsonPath":".spec.endpoint"}"#,
|
||||
printcolumn = r#"{"name":"Region","type":"string","description":"The region of the instance","jsonPath":".spec.region"}"#,
|
||||
printcolumn = r#"{"name":"Path Style","type":"boolean","description":"Is forcing path style","jsonPath":".spec.forcePathStyle"}"#,
|
||||
printcolumn = r#"{"name":"Total Buckets","type":"number","description":"How many buckets are there on the instance","jsonPath":".status.total_buckets"}"#,
|
||||
printcolumn = r#"{"name":"Status","type":"boolean","description":"Is the S3Instance ready","jsonPath":".status.ready"}"#
|
||||
)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct S3InstanceSpec {
|
||||
pub endpoint: String,
|
||||
pub region: String,
|
||||
pub credentials_secret: NamespacedName,
|
||||
#[serde(default)]
|
||||
pub force_path_style: bool,
|
||||
pub provider: Provider,
|
||||
}
|
||||
|
||||
/// The status object of `DbInstance`
|
||||
#[derive(Deserialize, Serialize, Clone, Default, Debug, JsonSchema)]
|
||||
pub struct S3InstanceStatus {
|
||||
#[serde(default)]
|
||||
pub ready: bool,
|
||||
//#[schemars(schema_with = "conditions")]
|
||||
pub conditions: Vec<Condition>,
|
||||
#[serde(default)]
|
||||
pub buckets: Option<Vec<String>>,
|
||||
#[serde(default)]
|
||||
pub total_buckets: Option<usize>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
|
||||
pub struct NamespacedName {
|
||||
#[serde(rename = "namespace")]
|
||||
pub namespace: String,
|
||||
#[serde(rename = "name")]
|
||||
pub name: String,
|
||||
}
|
||||
51
operator/src/conditions.rs
Normal file
51
operator/src/conditions.rs
Normal file
@@ -0,0 +1,51 @@
|
||||
use k8s_openapi::apimachinery::pkg::apis::meta::v1::{Condition, Time};
|
||||
use k8s_openapi::jiff::Timestamp;
|
||||
use kube::api::ObjectMeta;
|
||||
|
||||
pub(crate) fn set_condition(
|
||||
mut conditions: Vec<Condition>,
|
||||
metadata: ObjectMeta,
|
||||
condition_type: &str,
|
||||
condition_status: String,
|
||||
condition_reason: String,
|
||||
condition_message: String,
|
||||
) -> Vec<Condition> {
|
||||
if let Some(condition) = conditions.iter_mut().find(|c| c.type_ == condition_type) {
|
||||
condition.status = condition_status;
|
||||
condition.last_transition_time = Time::from(Timestamp::now());
|
||||
condition.message = condition_message;
|
||||
condition.reason = condition_reason;
|
||||
condition.observed_generation = metadata.generation;
|
||||
} else {
|
||||
conditions.push(Condition {
|
||||
last_transition_time: Time::from(Timestamp::now()),
|
||||
message: condition_message,
|
||||
observed_generation: metadata.generation,
|
||||
reason: condition_reason,
|
||||
status: condition_status,
|
||||
type_: condition_type.to_string(),
|
||||
});
|
||||
}
|
||||
conditions
|
||||
}
|
||||
|
||||
pub(crate) fn is_condition_true(mut conditions: Vec<Condition>, condition_type: &str) -> bool {
|
||||
if let Some(condition) = conditions.iter_mut().find(|c| c.type_ == condition_type) {
|
||||
return condition.status == "True";
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
pub(crate) fn is_condition_false(mut conditions: Vec<Condition>, condition_type: &str) -> bool {
|
||||
if let Some(condition) = conditions.iter_mut().find(|c| c.type_ == condition_type) {
|
||||
return condition.status == "False";
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
pub(crate) fn is_condition_unknown(mut conditions: Vec<Condition>, condition_type: &str) -> bool {
|
||||
if let Some(condition) = conditions.iter_mut().find(|c| c.type_ == condition_type) {
|
||||
return condition.status == "Unknown";
|
||||
}
|
||||
false
|
||||
}
|
||||
66
operator/src/controller.rs
Normal file
66
operator/src/controller.rs
Normal file
@@ -0,0 +1,66 @@
|
||||
mod conditions;
|
||||
mod controllers;
|
||||
mod providers;
|
||||
mod s3;
|
||||
use crate::controllers::{s3_bucket, s3_bucket_user, s3_instance};
|
||||
|
||||
use actix_web::{App, HttpRequest, HttpResponse, HttpServer, Responder, get, middleware};
|
||||
use clap::Parser;
|
||||
use kube::Client;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
|
||||
/// Simple program to greet a person
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(version, about, long_about = None)]
|
||||
struct Args {
|
||||
#[arg(long, default_value_t = 60000)]
|
||||
/// The address the metric endpoint binds to.
|
||||
metrics_port: u16,
|
||||
#[arg(long, default_value_t = 8081)]
|
||||
/// The address the probe endpoint binds to.
|
||||
health_probe_port: u16,
|
||||
#[arg(long, default_value_t = true)]
|
||||
/// Enabling this will ensure there is only one active controller manager.
|
||||
// DB Operator feature flags
|
||||
#[arg(long, default_value_t = false)]
|
||||
/// If enabled, DB Operator will run full reconciliation only
|
||||
/// when changes are detected
|
||||
is_change_check_nabled: bool,
|
||||
}
|
||||
|
||||
#[get("/health")]
|
||||
async fn health(_: HttpRequest) -> impl Responder {
|
||||
HttpResponse::Ok().json("healthy")
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
tracing_subscriber::fmt()
|
||||
.json()
|
||||
.with_env_filter(EnvFilter::from_default_env())
|
||||
.init();
|
||||
let client = Client::try_default()
|
||||
.await
|
||||
.expect("failed to create kube Client");
|
||||
let s3in_controller = s3_instance::run(client.clone());
|
||||
let s3bucket_controller = s3_bucket::run(client.clone());
|
||||
let s3bucketuser_controller = s3_bucket_user::run(client.clone());
|
||||
// Start web server
|
||||
let server = HttpServer::new(move || {
|
||||
App::new()
|
||||
.wrap(middleware::Logger::default().exclude("/health"))
|
||||
.service(health)
|
||||
})
|
||||
.bind("0.0.0.0:8080")?
|
||||
.shutdown_timeout(5);
|
||||
|
||||
// Both runtimes implements graceful shutdown, so poll until both are done
|
||||
tokio::join!(
|
||||
s3in_controller,
|
||||
s3bucket_controller,
|
||||
s3bucketuser_controller,
|
||||
server.run()
|
||||
)
|
||||
.3?;
|
||||
Ok(())
|
||||
}
|
||||
3
operator/src/controllers/mod.rs
Normal file
3
operator/src/controllers/mod.rs
Normal file
@@ -0,0 +1,3 @@
|
||||
pub(crate) mod s3_bucket;
|
||||
pub(crate) mod s3_bucket_user;
|
||||
pub(crate) mod s3_instance;
|
||||
489
operator/src/controllers/s3_bucket.rs
Normal file
489
operator/src/controllers/s3_bucket.rs
Normal file
@@ -0,0 +1,489 @@
|
||||
use crate::conditions::{is_condition_true, set_condition};
|
||||
use crate::controllers::s3_instance;
|
||||
use crate::s3::S3Client;
|
||||
use crate::s3::s3api::S3Api;
|
||||
use api::api::v1beta1::s3_bucket::{S3Bucket, S3BucketStatus};
|
||||
use api::api::v1beta1::s3_instance::S3Instance;
|
||||
use futures::StreamExt;
|
||||
use k8s_openapi::api::core::v1::{ConfigMap, Secret};
|
||||
use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference;
|
||||
use kube::api::{ListParams, ObjectMeta, PostParams};
|
||||
use kube::runtime::Controller;
|
||||
use kube::runtime::controller::Action;
|
||||
use kube::runtime::events::Recorder;
|
||||
use kube::runtime::watcher::Config;
|
||||
use kube::{Api, Client, Error, Resource, ResourceExt};
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use thiserror::Error;
|
||||
use tracing::*;
|
||||
|
||||
const TYPE_BUCKET_READY: &str = "BucketReady";
|
||||
const FIN_CLEANUP: &str = "s3.badhouseplants.net/bucket-cleanup";
|
||||
const CONFIGMAP_LABEL: &str = "s3.badhouseplants.net/s3-bucket";
|
||||
|
||||
const AWS_REGION: &str = "AWS_REGION";
|
||||
const AWS_ENDPOINT_URL: &str = "AWS_ENDPOINT_URL";
|
||||
|
||||
#[instrument(skip(ctx, obj), fields(trace_id))]
|
||||
pub(crate) async fn reconcile(obj: Arc<S3Bucket>, ctx: Arc<Context>) -> S3BucketResult<Action> {
|
||||
info!("Staring reconciling");
|
||||
let s3bucket_api: Api<S3Bucket> =
|
||||
Api::namespaced(ctx.client.clone(), &obj.namespace().unwrap());
|
||||
let cm_api: Api<ConfigMap> = Api::namespaced(ctx.client.clone(), &obj.namespace().unwrap());
|
||||
let s3in_api: Api<S3Instance> = Api::all(ctx.client.clone());
|
||||
|
||||
info!("Getting the S3Bucket resource");
|
||||
let mut s3bucket = match s3bucket_api.get(&obj.name_any()).await {
|
||||
Ok(s3bucket) => s3bucket,
|
||||
Err(Error::Api(ae)) if ae.code == 404 => {
|
||||
info!("Object is not found, probably removed");
|
||||
return Ok(Action::await_change());
|
||||
}
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
return Err(S3BucketError::KubeError(err));
|
||||
}
|
||||
};
|
||||
|
||||
// On the first reconciliation status is None
|
||||
// it needs to be initialized
|
||||
let mut status = match s3bucket.clone().status {
|
||||
None => {
|
||||
info!("Status is not yet set, initializing the object");
|
||||
return init_object(s3bucket, s3bucket_api).await;
|
||||
}
|
||||
Some(status) => status,
|
||||
};
|
||||
|
||||
let configmap_name = format!("{}-bucket-info", s3bucket.name_any());
|
||||
|
||||
info!("Getting the configmap");
|
||||
// Get the cm, if it's already there, we need to validate, or create an empty one
|
||||
let mut configmap = match get_configmap(cm_api.clone(), &configmap_name).await {
|
||||
Ok(configmap) => configmap,
|
||||
Err(Error::Api(ae)) if ae.code == 404 => {
|
||||
info!("ConfigMap is not found, creating a new one");
|
||||
let cm = ConfigMap {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(configmap_name),
|
||||
namespace: Some(s3bucket.clone().namespace().unwrap()),
|
||||
..Default::default()
|
||||
},
|
||||
..Default::default()
|
||||
};
|
||||
match create_configmap(cm_api.clone(), cm).await {
|
||||
Ok(cm) => cm,
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
return Err(S3BucketError::KubeError(err));
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
return Err(S3BucketError::KubeError(err));
|
||||
}
|
||||
};
|
||||
|
||||
info!("Labeling the configmap");
|
||||
configmap = match label_configmap(cm_api.clone(), &s3bucket.name_any(), configmap).await {
|
||||
Ok(configmap) => configmap,
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
return Err(S3BucketError::KubeError(err));
|
||||
}
|
||||
};
|
||||
|
||||
info!("Setting owner references to the configmap");
|
||||
if s3bucket.spec.own_configmap {
|
||||
configmap = match own_configmap(cm_api.clone(), s3bucket.clone(), configmap).await {
|
||||
Ok(configmap) => configmap,
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
return Err(S3BucketError::KubeError(err));
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
if is_condition_true(status.clone().conditions, TYPE_BUCKET_READY) {
|
||||
let mut current_finalizers = match s3bucket.clone().metadata.finalizers {
|
||||
Some(finalizers) => finalizers,
|
||||
None => vec![],
|
||||
};
|
||||
if s3bucket.spec.cleanup {
|
||||
if !current_finalizers.contains(&FIN_CLEANUP.to_string()) {
|
||||
info!("Adding a finalizer");
|
||||
current_finalizers.push(FIN_CLEANUP.to_string());
|
||||
}
|
||||
} else {
|
||||
if current_finalizers.contains(&FIN_CLEANUP.to_string()) {
|
||||
if let Some(index) = current_finalizers
|
||||
.iter()
|
||||
.position(|x| *x == FIN_CLEANUP.to_string())
|
||||
{
|
||||
current_finalizers.remove(index);
|
||||
};
|
||||
}
|
||||
};
|
||||
s3bucket.metadata.finalizers = Some(current_finalizers);
|
||||
match s3bucket_api
|
||||
.replace(&s3bucket.name_any(), &PostParams::default(), &s3bucket)
|
||||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
return Ok(Action::await_change());
|
||||
}
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
return Err(S3BucketError::KubeError(err));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
info!("Getting the S3Intsance");
|
||||
let s3in = match s3in_api.get(&s3bucket.spec.instance).await {
|
||||
Ok(s3in) => s3in,
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
return Err(S3BucketError::KubeError(err));
|
||||
}
|
||||
};
|
||||
|
||||
info!("Updating the ConfigMap");
|
||||
if let Err(err) = ensure_data_configmap(cm_api.clone(), s3in.clone(), configmap.clone()).await {
|
||||
error!("{}", err);
|
||||
return Err(S3BucketError::KubeError(err));
|
||||
};
|
||||
|
||||
info!("Getting the s3instance secret");
|
||||
let secret_ns = s3in.clone().spec.credentials_secret.namespace;
|
||||
let secret_api: Api<Secret> = Api::namespaced(ctx.client.clone(), &secret_ns);
|
||||
|
||||
let secret = match s3_instance::get_secret(secret_api.clone(), s3in.clone()).await {
|
||||
Ok(secret) => secret,
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
return Err(S3BucketError::KubeError(err));
|
||||
}
|
||||
};
|
||||
|
||||
info!("Getting data from the secret");
|
||||
// Getting data from the secret to initialize the clinet
|
||||
let data = match secret.data {
|
||||
Some(data) => data,
|
||||
None => {
|
||||
let err = anyhow::Error::msg("empty data");
|
||||
error!("{}", err);
|
||||
return Err(S3BucketError::InvalidSecret(err));
|
||||
}
|
||||
};
|
||||
|
||||
let access_key = match data.get(s3_instance::ACCESS_KEY) {
|
||||
Some(access_key) => String::from_utf8(access_key.0.clone()).unwrap(),
|
||||
None => {
|
||||
let err = anyhow::Error::msg("empty access key");
|
||||
error!("{}", err);
|
||||
return Err(S3BucketError::InvalidSecret(err));
|
||||
}
|
||||
};
|
||||
let secret_key = match data.get(s3_instance::SECRET_KEY) {
|
||||
Some(secret_key) => String::from_utf8(secret_key.0.clone()).unwrap(),
|
||||
None => {
|
||||
let err = anyhow::Error::msg("empty secret key");
|
||||
error!("{}", err);
|
||||
return Err(S3BucketError::InvalidSecret(err));
|
||||
}
|
||||
};
|
||||
|
||||
info!("Creating an s3 client");
|
||||
let s3_client = S3Api::new(
|
||||
access_key,
|
||||
secret_key,
|
||||
s3in.clone().spec.endpoint.to_string(),
|
||||
s3in.clone().spec.region.to_string(),
|
||||
s3in.clone().spec.force_path_style,
|
||||
)
|
||||
.await;
|
||||
|
||||
let bucket_name = format!("{}-{}", s3bucket.namespace().unwrap(), s3bucket.name_any());
|
||||
|
||||
if s3bucket.metadata.deletion_timestamp.is_some() {
|
||||
info!("Object is marked for deletion");
|
||||
if let Some(mut finalizers) = s3bucket.clone().metadata.finalizers {
|
||||
if finalizers.contains(&FIN_CLEANUP.to_string()) {
|
||||
match s3_client.clone().delete_bucket(bucket_name.clone()).await {
|
||||
Ok(_) => {
|
||||
if let Some(index) = finalizers
|
||||
.iter()
|
||||
.position(|x| *x == FIN_CLEANUP.to_string())
|
||||
{
|
||||
finalizers.remove(index);
|
||||
};
|
||||
}
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
return Err(S3BucketError::IllegalS3Bucket);
|
||||
}
|
||||
}
|
||||
}
|
||||
s3bucket.metadata.finalizers = Some(finalizers);
|
||||
};
|
||||
match s3bucket_api
|
||||
.replace(&s3bucket.name_any(), &PostParams::default(), &s3bucket)
|
||||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
return Ok(Action::await_change());
|
||||
}
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
return Err(S3BucketError::KubeError(err));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("Getting buckets");
|
||||
let buckets = match s3_client.clone().list_buckets().await {
|
||||
Ok(buckets) => buckets,
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
return Err(S3BucketError::IllegalS3Bucket);
|
||||
}
|
||||
};
|
||||
if buckets.contains(&bucket_name) {
|
||||
info!("Bucket already exists");
|
||||
} else {
|
||||
if let Err(err) = s3_client.clone().create_bucket(bucket_name.clone()).await {
|
||||
error!("{}", err);
|
||||
return Err(S3BucketError::IllegalS3Bucket);
|
||||
}
|
||||
}
|
||||
|
||||
status.ready = true;
|
||||
status.conditions = set_condition(
|
||||
status.conditions,
|
||||
s3bucket.metadata.clone(),
|
||||
TYPE_BUCKET_READY,
|
||||
"True".to_string(),
|
||||
"Reconciled".to_string(),
|
||||
"Bucket is ready".to_string(),
|
||||
);
|
||||
status.endpoint = Some(s3in.clone().spec.endpoint);
|
||||
status.size = s3_client.clone().count_size(bucket_name.clone()).await.ok();
|
||||
status.total_objects = s3_client
|
||||
.clone()
|
||||
.count_objects(bucket_name.clone())
|
||||
.await
|
||||
.ok();
|
||||
status.region = Some(s3in.spec.region);
|
||||
status.bucket_name = Some(bucket_name.clone());
|
||||
s3bucket.status = Some(status);
|
||||
|
||||
info!("Updating status of the s3bucket resource");
|
||||
match s3bucket_api
|
||||
.replace_status(&s3bucket.name_any(), &PostParams::default(), &s3bucket)
|
||||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
return Ok(Action::requeue(Duration::from_secs(120)));
|
||||
}
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
return Err(S3BucketError::KubeError(err));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// Bootstrap the object by adding a default status to it
|
||||
async fn init_object(mut obj: S3Bucket, api: Api<S3Bucket>) -> Result<Action, S3BucketError> {
|
||||
let conditions = set_condition(
|
||||
vec![],
|
||||
obj.metadata.clone(),
|
||||
TYPE_BUCKET_READY,
|
||||
"Unknown".to_string(),
|
||||
"Reconciling".to_string(),
|
||||
"Reconciliation started".to_string(),
|
||||
);
|
||||
obj.status = Some(S3BucketStatus {
|
||||
conditions,
|
||||
..S3BucketStatus::default()
|
||||
});
|
||||
match api
|
||||
.replace_status(obj.clone().name_any().as_str(), &Default::default(), &obj)
|
||||
.await
|
||||
{
|
||||
Ok(_) => Ok(Action::await_change()),
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
Err(S3BucketError::KubeError(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get the configmap with the bucket data
|
||||
async fn get_configmap(api: Api<ConfigMap>, name: &str) -> Result<ConfigMap, kube::Error> {
|
||||
info!("Getting a configmap: {}", name);
|
||||
match api.get(name).await {
|
||||
Ok(cm) => Ok(cm),
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
|
||||
// Create ConfigMap
|
||||
async fn create_configmap(api: Api<ConfigMap>, cm: ConfigMap) -> Result<ConfigMap, kube::Error> {
|
||||
match api.create(&PostParams::default(), &cm).await {
|
||||
Ok(cm) => get_configmap(api, &cm.name_any()).await,
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
|
||||
async fn label_configmap(
|
||||
api: Api<ConfigMap>,
|
||||
s3bucket_name: &str,
|
||||
mut cm: ConfigMap,
|
||||
) -> Result<ConfigMap, kube::Error> {
|
||||
let mut labels = match &cm.clone().metadata.labels {
|
||||
Some(labels) => labels.clone(),
|
||||
None => {
|
||||
let map: BTreeMap<String, String> = BTreeMap::new();
|
||||
map
|
||||
}
|
||||
};
|
||||
labels.insert(CONFIGMAP_LABEL.to_string(), s3bucket_name.to_string());
|
||||
cm.metadata.labels = Some(labels);
|
||||
api.replace(&cm.name_any(), &PostParams::default(), &cm)
|
||||
.await?;
|
||||
|
||||
let cm = match api.get(&cm.name_any()).await {
|
||||
Ok(cm) => cm,
|
||||
Err(err) => {
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
Ok(cm)
|
||||
}
|
||||
|
||||
async fn own_configmap(
|
||||
api: Api<ConfigMap>,
|
||||
s3bucket: S3Bucket,
|
||||
mut cm: ConfigMap,
|
||||
) -> Result<ConfigMap, kube::Error> {
|
||||
let mut owner_references = match &cm.clone().metadata.owner_references {
|
||||
Some(owner_references) => owner_references.clone(),
|
||||
None => {
|
||||
let owner_references: Vec<OwnerReference> = vec![];
|
||||
owner_references
|
||||
}
|
||||
};
|
||||
|
||||
if owner_references
|
||||
.iter()
|
||||
.find(|or| or.uid == s3bucket.uid().unwrap())
|
||||
.is_some()
|
||||
{
|
||||
return Ok(cm);
|
||||
}
|
||||
|
||||
let new_owner_reference = OwnerReference {
|
||||
api_version: S3Bucket::api_version(&()).into(),
|
||||
kind: S3Bucket::kind(&()).into(),
|
||||
name: s3bucket.name_any(),
|
||||
uid: s3bucket.uid().unwrap(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
owner_references.push(new_owner_reference);
|
||||
cm.metadata.owner_references = Some(owner_references);
|
||||
api.replace(&cm.name_any(), &PostParams::default(), &cm)
|
||||
.await?;
|
||||
|
||||
let cm = match api.get(&cm.name_any()).await {
|
||||
Ok(cm) => cm,
|
||||
Err(err) => {
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
Ok(cm)
|
||||
}
|
||||
|
||||
async fn ensure_data_configmap(
|
||||
api: Api<ConfigMap>,
|
||||
s3in: S3Instance,
|
||||
mut cm: ConfigMap,
|
||||
) -> Result<ConfigMap, kube::Error> {
|
||||
let mut data = match &cm.clone().data {
|
||||
Some(data) => data.clone(),
|
||||
None => {
|
||||
let map: BTreeMap<String, String> = BTreeMap::new();
|
||||
map
|
||||
}
|
||||
};
|
||||
|
||||
data.insert(AWS_REGION.to_string(), s3in.spec.region);
|
||||
data.insert(AWS_ENDPOINT_URL.to_string(), s3in.spec.endpoint);
|
||||
|
||||
cm.data = Some(data);
|
||||
api.replace(&cm.name_any(), &PostParams::default(), &cm)
|
||||
.await?;
|
||||
|
||||
match api.get(&cm.name_any()).await {
|
||||
Ok(cm) => Ok(cm),
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn error_policy(_: Arc<S3Bucket>, _: &S3BucketError, _: Arc<Context>) -> Action {
|
||||
Action::requeue(Duration::from_secs(5 * 60))
|
||||
}
|
||||
|
||||
#[instrument(skip(client), fields(trace_id))]
|
||||
pub async fn run(client: Client) {
|
||||
let s3buckets = Api::<S3Bucket>::all(client.clone());
|
||||
if let Err(err) = s3buckets.list(&ListParams::default().limit(1)).await {
|
||||
error!("{}", err);
|
||||
std::process::exit(1);
|
||||
}
|
||||
let recorder = Recorder::new(client.clone(), "s3bucket-controller".into());
|
||||
let context = Context { client, recorder };
|
||||
Controller::new(s3buckets, Config::default().any_semantic())
|
||||
.shutdown_on_signal()
|
||||
.run(reconcile, error_policy, Arc::new(context))
|
||||
.filter_map(|x| async move { std::result::Result::ok(x) })
|
||||
.for_each(|_| futures::future::ready(()))
|
||||
.await;
|
||||
}
|
||||
// Context for our reconciler
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct Context {
|
||||
/// Kubernetes client
|
||||
pub client: Client,
|
||||
/// Event recorder
|
||||
pub recorder: Recorder,
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum S3BucketError {
|
||||
#[error("SerializationError: {0}")]
|
||||
SerializationError(#[source] serde_json::Error),
|
||||
|
||||
#[error("Kube Error: {0}")]
|
||||
KubeError(#[source] kube::Error),
|
||||
|
||||
#[error("Finalizer Error: {0}")]
|
||||
// NB: awkward type because finalizer::Error embeds the reconciler error (which is this)
|
||||
// so boxing this error to break cycles
|
||||
FinalizerError(#[source] Box<kube::runtime::finalizer::Error<S3BucketError>>),
|
||||
|
||||
#[error("IllegalS3Bucket")]
|
||||
IllegalS3Bucket,
|
||||
|
||||
#[error("SecretIsAlreadyLabeled")]
|
||||
SecretIsAlreadyLabeled,
|
||||
|
||||
#[error("Invalid Secret: {0}")]
|
||||
InvalidSecret(#[source] anyhow::Error),
|
||||
}
|
||||
|
||||
pub type S3BucketResult<T, E = S3BucketError> = std::result::Result<T, E>;
|
||||
496
operator/src/controllers/s3_bucket_user.rs
Normal file
496
operator/src/controllers/s3_bucket_user.rs
Normal file
@@ -0,0 +1,496 @@
|
||||
use crate::conditions::{is_condition_true, set_condition};
|
||||
use crate::controllers::s3_instance;
|
||||
use crate::providers::{ProviderAPI, SupportedProvider};
|
||||
use crate::providers::rustfs::RustFS;
|
||||
use crate::s3::s3api::S3Api;
|
||||
use api::api::v1beta1::s3_bucket::S3Bucket;
|
||||
use api::api::v1beta1::s3_bucket_user::{S3BucketUser, S3BucketUserStatus};
|
||||
use api::api::v1beta1::s3_instance::S3Instance;
|
||||
use futures::StreamExt;
|
||||
use k8s_openapi::ByteString;
|
||||
use k8s_openapi::api::core::v1::{Pod, Secret};
|
||||
use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference;
|
||||
use kube::api::{ListParams, ObjectMeta, PostParams};
|
||||
use kube::runtime::Controller;
|
||||
use kube::runtime::controller::Action;
|
||||
use kube::runtime::events::Recorder;
|
||||
use kube::runtime::watcher::Config;
|
||||
use kube::{Api, Client, Error, Resource, ResourceExt};
|
||||
use rand::RngExt;
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use thiserror::Error;
|
||||
use tracing::*;
|
||||
|
||||
const TYPE_USER_READY: &str = "UserReady";
|
||||
const FIN_CLEANUP: &str = "s3.badhouseplants.net/user-cleanup";
|
||||
const SECRET_LABEL: &str = "s3.badhouseplants.net/s3-bucket";
|
||||
|
||||
const AWS_ACCESS_KEY_ID: &str = "AWS_ACCESS_KEY_ID";
|
||||
const AWS_SECCRET_ACCESS_KEY: &str = "AWS_SECRET_ACCESS_KEY";
|
||||
|
||||
const CHARSET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ\
|
||||
abcdefghijklmnopqrstuvwxyz\
|
||||
0123456789)(*&^%$#@!~";
|
||||
const PASSWORD_LEN: usize = 40;
|
||||
|
||||
#[instrument(skip(ctx, obj), fields(trace_id))]
|
||||
pub(crate) async fn reconcile(
|
||||
obj: Arc<S3BucketUser>,
|
||||
ctx: Arc<Context>,
|
||||
) -> S3BucketUserResult<Action> {
|
||||
info!("Staring reconciling");
|
||||
let s3bucketuser_api: Api<S3BucketUser> =
|
||||
Api::namespaced(ctx.client.clone(), &obj.namespace().unwrap());
|
||||
let s3bucket_api: Api<S3Bucket> =
|
||||
Api::namespaced(ctx.client.clone(), &obj.namespace().unwrap());
|
||||
let secret_api: Api<Secret> = Api::namespaced(ctx.client.clone(), &obj.namespace().unwrap());
|
||||
let s3in_api: Api<S3Instance> = Api::all(ctx.client.clone());
|
||||
info!("Getting the S3BucketUser resource");
|
||||
let mut s3bucketuser = match s3bucketuser_api.get(&obj.name_any()).await {
|
||||
Ok(s3bucketuser) => s3bucketuser,
|
||||
Err(Error::Api(ae)) if ae.code == 404 => {
|
||||
info!("Object is not found, probably removed");
|
||||
return Ok(Action::await_change());
|
||||
}
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
return Err(S3BucketUserError::KubeError(err));
|
||||
}
|
||||
};
|
||||
|
||||
// On the first reconciliation status is None
|
||||
// it needs to be initialized
|
||||
let mut status = match s3bucketuser.clone().status {
|
||||
None => {
|
||||
info!("Status is not yet set, initializing the object");
|
||||
return init_object(s3bucketuser, s3bucketuser_api).await;
|
||||
}
|
||||
Some(status) => status,
|
||||
};
|
||||
|
||||
let secret_name = format!("{}-bucket-creds", s3bucketuser.name_any());
|
||||
|
||||
info!("Getting the secret");
|
||||
// Get the secret, if it's already there, we need to validate, or create an empty one
|
||||
let mut secret = match get_secret(secret_api.clone(), &secret_name).await {
|
||||
Ok(secret) => secret,
|
||||
Err(Error::Api(ae)) if ae.code == 404 => {
|
||||
info!("Secret is not found, creating a new one");
|
||||
let secret = Secret {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(secret_name),
|
||||
namespace: Some(s3bucketuser.clone().namespace().unwrap()),
|
||||
..Default::default()
|
||||
},
|
||||
..Default::default()
|
||||
};
|
||||
match create_secret(secret_api.clone(), secret).await {
|
||||
Ok(cm) => cm,
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
return Err(S3BucketUserError::KubeError(err));
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
return Err(S3BucketUserError::KubeError(err));
|
||||
}
|
||||
};
|
||||
|
||||
info!("Labeling the secret");
|
||||
secret = match label_secret(secret_api.clone(), &s3bucketuser.name_any(), secret).await {
|
||||
Ok(configmap) => configmap,
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
return Err(S3BucketUserError::KubeError(err));
|
||||
}
|
||||
};
|
||||
|
||||
info!("Setting owner references to the secret");
|
||||
if s3bucketuser.spec.own_secret {
|
||||
secret = match own_secret(secret_api.clone(), s3bucketuser.clone(), secret).await {
|
||||
Ok(secret) => secret,
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
return Err(S3BucketUserError::KubeError(err));
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
if is_condition_true(status.clone().conditions, TYPE_USER_READY) {
|
||||
let mut current_finalizers = match s3bucketuser.clone().metadata.finalizers {
|
||||
Some(finalizers) => finalizers,
|
||||
None => vec![],
|
||||
};
|
||||
if s3bucketuser.spec.cleanup {
|
||||
if !current_finalizers.contains(&FIN_CLEANUP.to_string()) {
|
||||
info!("Adding a finalizer");
|
||||
current_finalizers.push(FIN_CLEANUP.to_string());
|
||||
}
|
||||
} else {
|
||||
if current_finalizers.contains(&FIN_CLEANUP.to_string()) {
|
||||
if let Some(index) = current_finalizers
|
||||
.iter()
|
||||
.position(|x| *x == FIN_CLEANUP.to_string())
|
||||
{
|
||||
current_finalizers.remove(index);
|
||||
};
|
||||
}
|
||||
};
|
||||
s3bucketuser.metadata.finalizers = Some(current_finalizers);
|
||||
if let Err(err) = s3bucketuser_api
|
||||
.replace(
|
||||
&s3bucketuser.name_any(),
|
||||
&PostParams::default(),
|
||||
&s3bucketuser,
|
||||
)
|
||||
.await
|
||||
{
|
||||
error!("{}", err);
|
||||
return Err(S3BucketUserError::KubeError(err));
|
||||
}
|
||||
};
|
||||
|
||||
info!("Getting the S3Bucket");
|
||||
let s3bucket = match s3bucket_api.get(&s3bucketuser.spec.bucket).await {
|
||||
Ok(s3bucket) => s3bucket,
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
return Err(S3BucketUserError::KubeError(err));
|
||||
}
|
||||
};
|
||||
|
||||
info!("Getting the S3Intsance");
|
||||
let s3in = match s3in_api.get(&s3bucket.spec.instance).await {
|
||||
Ok(s3in) => s3in,
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
return Err(S3BucketUserError::KubeError(err));
|
||||
}
|
||||
};
|
||||
|
||||
info!("Getting the s3instance secret");
|
||||
let secret_ns = s3in.clone().spec.credentials_secret.namespace;
|
||||
let s3in_secret_api: Api<Secret> = Api::namespaced(ctx.client.clone(), &secret_ns);
|
||||
let pod_api: Api<Pod> = Api::namespaced(ctx.client.clone(), &secret_ns);
|
||||
|
||||
let s3in_secret = match s3_instance::get_secret(s3in_secret_api.clone(), s3in.clone()).await {
|
||||
Ok(secret) => secret,
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
return Err(S3BucketUserError::KubeError(err));
|
||||
}
|
||||
};
|
||||
|
||||
info!("Getting data from the secret");
|
||||
// Getting data from the secret to initialize the clinet
|
||||
let data = match s3in_secret.data {
|
||||
Some(data) => data,
|
||||
None => {
|
||||
let err = anyhow::Error::msg("empty data");
|
||||
error!("{}", err);
|
||||
return Err(S3BucketUserError::InvalidSecret(err));
|
||||
}
|
||||
};
|
||||
|
||||
let access_key = match data.get(s3_instance::ACCESS_KEY) {
|
||||
Some(access_key) => String::from_utf8(access_key.0.clone()).unwrap(),
|
||||
None => {
|
||||
let err = anyhow::Error::msg("empty access key");
|
||||
error!("{}", err);
|
||||
return Err(S3BucketUserError::InvalidSecret(err));
|
||||
}
|
||||
};
|
||||
let secret_key = match data.get(s3_instance::SECRET_KEY) {
|
||||
Some(secret_key) => String::from_utf8(secret_key.0.clone()).unwrap(),
|
||||
None => {
|
||||
let err = anyhow::Error::msg("empty secret key");
|
||||
error!("{}", err);
|
||||
return Err(S3BucketUserError::InvalidSecret(err));
|
||||
}
|
||||
};
|
||||
|
||||
let username = format!(
|
||||
"{}-{}",
|
||||
s3bucketuser.namespace().unwrap(),
|
||||
s3bucketuser.name_any()
|
||||
);
|
||||
let password = generate_password();
|
||||
|
||||
let provider: SupportedProvider = match s3in.clone().spec.provider {
|
||||
api::api::v1beta1::s3_instance::Provider::Minio => todo!(),
|
||||
api::api::v1beta1::s3_instance::Provider::Rustfs => SupportedProvider::RustFS(RustFS::new(access_key.clone(), secret_key.clone(), s3in.spec.endpoint.clone(), s3in.spec.region.clone())),
|
||||
};
|
||||
|
||||
info!("Creating a user");
|
||||
if let Err(err) = provider.create_user(username.clone(), password.clone()) {
|
||||
error!("{}", err);
|
||||
return Err(S3BucketUserError::IllegalS3BucketUser);
|
||||
}
|
||||
|
||||
info!("Creating an s3 client");
|
||||
let s3_client = S3Api::new(
|
||||
access_key,
|
||||
secret_key,
|
||||
s3in.clone().spec.endpoint.to_string(),
|
||||
s3in.clone().spec.region.to_string(),
|
||||
s3in.clone().spec.force_path_style,
|
||||
)
|
||||
.await;
|
||||
secret = match ensure_data_secret(secret_api, secret, username.clone(), password.clone()).await
|
||||
{
|
||||
Ok(secret) => secret,
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
return Err(S3BucketUserError::KubeError(err));
|
||||
}
|
||||
};
|
||||
|
||||
if s3bucketuser.metadata.deletion_timestamp.is_some() {
|
||||
todo!();
|
||||
}
|
||||
|
||||
status.ready = true;
|
||||
status.conditions = set_condition(
|
||||
status.conditions,
|
||||
s3bucket.metadata.clone(),
|
||||
TYPE_USER_READY,
|
||||
"True".to_string(),
|
||||
"Reconciled".to_string(),
|
||||
"User is ready".to_string(),
|
||||
);
|
||||
status.access_key = Some(username.clone());
|
||||
s3bucketuser.status = Some(status);
|
||||
|
||||
info!("Updating status of the s3bucket user resource");
|
||||
match s3bucketuser_api
|
||||
.replace_status(
|
||||
&s3bucketuser.name_any(),
|
||||
&PostParams::default(),
|
||||
&s3bucketuser,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
return Ok(Action::requeue(Duration::from_secs(120)));
|
||||
}
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
return Err(S3BucketUserError::KubeError(err));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// Bootstrap the object by adding a default status to it
|
||||
async fn init_object(
|
||||
mut obj: S3BucketUser,
|
||||
api: Api<S3BucketUser>,
|
||||
) -> Result<Action, S3BucketUserError> {
|
||||
let conditions = set_condition(
|
||||
vec![],
|
||||
obj.metadata.clone(),
|
||||
TYPE_USER_READY,
|
||||
"Unknown".to_string(),
|
||||
"Reconciling".to_string(),
|
||||
"Reconciliation started".to_string(),
|
||||
);
|
||||
obj.status = Some(S3BucketUserStatus {
|
||||
conditions,
|
||||
..S3BucketUserStatus::default()
|
||||
});
|
||||
match api
|
||||
.replace_status(obj.clone().name_any().as_str(), &Default::default(), &obj)
|
||||
.await
|
||||
{
|
||||
Ok(_) => Ok(Action::await_change()),
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
Err(S3BucketUserError::KubeError(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get the configmap with the bucket data
|
||||
async fn get_secret(api: Api<Secret>, name: &str) -> Result<Secret, kube::Error> {
|
||||
info!("Getting a secret: {}", name);
|
||||
match api.get(name).await {
|
||||
Ok(cm) => Ok(cm),
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
|
||||
// Create ConfigMap
|
||||
async fn create_secret(api: Api<Secret>, secret: Secret) -> Result<Secret, kube::Error> {
|
||||
match api.create(&PostParams::default(), &secret).await {
|
||||
Ok(secret) => get_secret(api, &secret.name_any()).await,
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
|
||||
async fn label_secret(
|
||||
api: Api<Secret>,
|
||||
s3bucket_name: &str,
|
||||
mut secret: Secret,
|
||||
) -> Result<Secret, kube::Error> {
|
||||
let mut labels = match &secret.clone().metadata.labels {
|
||||
Some(labels) => labels.clone(),
|
||||
None => {
|
||||
let map: BTreeMap<String, String> = BTreeMap::new();
|
||||
map
|
||||
}
|
||||
};
|
||||
labels.insert(SECRET_LABEL.to_string(), s3bucket_name.to_string());
|
||||
secret.metadata.labels = Some(labels);
|
||||
api.replace(&secret.name_any(), &PostParams::default(), &secret)
|
||||
.await?;
|
||||
|
||||
let secret = match api.get(&secret.name_any()).await {
|
||||
Ok(secret) => secret,
|
||||
Err(err) => {
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
Ok(secret)
|
||||
}
|
||||
|
||||
async fn own_secret(
|
||||
api: Api<Secret>,
|
||||
s3bucketuser: S3BucketUser,
|
||||
mut secret: Secret,
|
||||
) -> Result<Secret, kube::Error> {
|
||||
let mut owner_references = match &secret.clone().metadata.owner_references {
|
||||
Some(owner_references) => owner_references.clone(),
|
||||
None => {
|
||||
let owner_references: Vec<OwnerReference> = vec![];
|
||||
owner_references
|
||||
}
|
||||
};
|
||||
|
||||
if owner_references
|
||||
.iter()
|
||||
.find(|or| or.uid == s3bucketuser.uid().unwrap())
|
||||
.is_some()
|
||||
{
|
||||
return Ok(secret);
|
||||
}
|
||||
|
||||
let new_owner_reference = OwnerReference {
|
||||
api_version: S3Bucket::api_version(&()).into(),
|
||||
kind: S3Bucket::kind(&()).into(),
|
||||
name: s3bucketuser.name_any(),
|
||||
uid: s3bucketuser.uid().unwrap(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
owner_references.push(new_owner_reference);
|
||||
secret.metadata.owner_references = Some(owner_references);
|
||||
api.replace(&secret.name_any(), &PostParams::default(), &secret)
|
||||
.await?;
|
||||
|
||||
let secret = match api.get(&secret.name_any()).await {
|
||||
Ok(secret) => secret,
|
||||
Err(err) => {
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
Ok(secret)
|
||||
}
|
||||
|
||||
async fn ensure_data_secret(
|
||||
api: Api<Secret>,
|
||||
mut secret: Secret,
|
||||
username: String,
|
||||
password: String,
|
||||
) -> Result<Secret, kube::Error> {
|
||||
let mut data = match &secret.clone().data {
|
||||
Some(data) => data.clone(),
|
||||
None => {
|
||||
let map: BTreeMap<String, ByteString> = BTreeMap::new();
|
||||
map
|
||||
}
|
||||
};
|
||||
|
||||
data.insert(
|
||||
AWS_ACCESS_KEY_ID.to_string(),
|
||||
ByteString(username.as_bytes().to_vec()),
|
||||
);
|
||||
data.insert(
|
||||
AWS_SECCRET_ACCESS_KEY.to_string(),
|
||||
ByteString(password.as_bytes().to_vec()),
|
||||
);
|
||||
|
||||
secret.data = Some(data);
|
||||
api.replace(&secret.name_any(), &PostParams::default(), &secret)
|
||||
.await?;
|
||||
|
||||
api.get(&secret.name_any()).await
|
||||
}
|
||||
|
||||
fn generate_password() -> String {
|
||||
let mut rng = rand::rng();
|
||||
let password: String = (0..PASSWORD_LEN)
|
||||
.map(|_| {
|
||||
let idx = rng.random_range(0..CHARSET.len());
|
||||
char::from(CHARSET[idx])
|
||||
})
|
||||
.collect();
|
||||
password
|
||||
}
|
||||
|
||||
pub(crate) fn error_policy(_: Arc<S3BucketUser>, _: &S3BucketUserError, _: Arc<Context>) -> Action {
|
||||
Action::requeue(Duration::from_secs(5 * 60))
|
||||
}
|
||||
|
||||
#[instrument(skip(client), fields(trace_id))]
|
||||
pub async fn run(client: Client) {
|
||||
let s3bucketusers = Api::<S3BucketUser>::all(client.clone());
|
||||
if let Err(err) = s3bucketusers.list(&ListParams::default().limit(1)).await {
|
||||
error!("{}", err);
|
||||
std::process::exit(1);
|
||||
}
|
||||
let recorder = Recorder::new(client.clone(), "s3bucketuser-controller".into());
|
||||
let context = Context { client, recorder };
|
||||
Controller::new(s3bucketusers, Config::default().any_semantic())
|
||||
.shutdown_on_signal()
|
||||
.run(reconcile, error_policy, Arc::new(context))
|
||||
.filter_map(|x| async move { std::result::Result::ok(x) })
|
||||
.for_each(|_| futures::future::ready(()))
|
||||
.await;
|
||||
}
|
||||
|
||||
// Context for our reconciler
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct Context {
|
||||
/// Kubernetes client
|
||||
pub client: Client,
|
||||
/// Event recorder
|
||||
pub recorder: Recorder,
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum S3BucketUserError {
|
||||
#[error("SerializationError: {0}")]
|
||||
SerializationError(#[source] serde_json::Error),
|
||||
|
||||
#[error("Kube Error: {0}")]
|
||||
KubeError(#[source] kube::Error),
|
||||
|
||||
#[error("Finalizer Error: {0}")]
|
||||
// NB: awkward type because finalizer::Error embeds the reconciler error (which is this)
|
||||
// so boxing this error to break cycles
|
||||
FinalizerError(#[source] Box<kube::runtime::finalizer::Error<S3BucketUserError>>),
|
||||
|
||||
#[error("IllegalS3BucketUser")]
|
||||
IllegalS3BucketUser,
|
||||
|
||||
#[error("SecretIsAlreadyLabeled")]
|
||||
SecretIsAlreadyLabeled,
|
||||
|
||||
#[error("Invalid Secret: {0}")]
|
||||
InvalidSecret(#[source] anyhow::Error),
|
||||
}
|
||||
|
||||
pub type S3BucketUserResult<T, E = S3BucketUserError> = std::result::Result<T, E>;
|
||||
456
operator/src/controllers/s3_instance.rs
Normal file
456
operator/src/controllers/s3_instance.rs
Normal file
@@ -0,0 +1,456 @@
|
||||
use crate::conditions::{is_condition_true, is_condition_unknown, set_condition};
|
||||
use crate::s3::S3Client;
|
||||
use crate::s3::s3api::S3Api;
|
||||
use api::api::v1beta1::s3_instance::{S3Instance, S3InstanceStatus};
|
||||
use futures::StreamExt;
|
||||
use k8s_openapi::api::core::v1::Secret;
|
||||
use kube::api::{ListParams, PostParams};
|
||||
use kube::runtime::Controller;
|
||||
use kube::runtime::controller::Action;
|
||||
use kube::runtime::events::{Event, Recorder};
|
||||
use kube::runtime::watcher::Config;
|
||||
use kube::{Api, Client, Error, Resource, ResourceExt};
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use thiserror::Error;
|
||||
use tracing::*;
|
||||
|
||||
const TYPE_CONNECTED: &str = "Connected";
|
||||
const TYPE_SECRET_LABELED: &str = "SecretLabeled";
|
||||
const FIN_SECRET_LABEL: &str = "s3.badhouseplants.net/s3-label";
|
||||
const SECRET_LABEL: &str = "s3.badhouseplants.net/s3-instance";
|
||||
pub(crate) const ACCESS_KEY: &str = "ACCESS_KEY";
|
||||
pub(crate) const SECRET_KEY: &str = "SECRET_KEY";
|
||||
|
||||
#[instrument(skip(ctx, obj), fields(trace_id))]
|
||||
pub(crate) async fn reconcile(obj: Arc<S3Instance>, ctx: Arc<Context>) -> S3InstanceResult<Action> {
|
||||
info!("Staring reconciling");
|
||||
info!("Getting the S3Instance resource");
|
||||
let s3_api: Api<S3Instance> = Api::all(ctx.client.clone());
|
||||
let mut s3in = match s3_api.get(obj.name_any().as_str()).await {
|
||||
Ok(res) => res,
|
||||
Err(Error::Api(ae)) if ae.code == 404 => {
|
||||
info!("Object is not found, probably removed");
|
||||
return Ok(Action::await_change());
|
||||
}
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
return Err(S3InstanceError::KubeError(err));
|
||||
}
|
||||
};
|
||||
|
||||
let secret_ns = s3in.clone().spec.credentials_secret.namespace;
|
||||
let secret_api: Api<Secret> = Api::namespaced(ctx.client.clone(), &secret_ns);
|
||||
|
||||
let mut status = match s3in.clone().status {
|
||||
None => {
|
||||
info!("Status is not yet set, initializing the object");
|
||||
return init_object(s3in, s3_api).await;
|
||||
}
|
||||
Some(status) => status,
|
||||
};
|
||||
|
||||
let secret = match get_secret(secret_api.clone(), s3in.clone()).await {
|
||||
Ok(secret) => secret,
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
ctx.recorder
|
||||
.publish(
|
||||
&Event {
|
||||
type_: kube::runtime::events::EventType::Warning,
|
||||
reason: "S3InstanceReconciliation".to_string(),
|
||||
note: Some("Secret wasn't found".to_string()),
|
||||
action: "SecretLookUp".to_string(),
|
||||
secondary: None,
|
||||
},
|
||||
&s3in.clone().object_ref(&()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
return Err(S3InstanceError::KubeError(err));
|
||||
}
|
||||
};
|
||||
|
||||
if s3in.metadata.deletion_timestamp.is_some() {
|
||||
info!("Object is marked for deletion");
|
||||
if let Some(mut finalizers) = s3in.clone().metadata.finalizers {
|
||||
if finalizers.contains(&FIN_SECRET_LABEL.to_string()) {
|
||||
match unlabel_secret(ctx.clone(), s3in.clone(), secret).await {
|
||||
Ok(_) => {
|
||||
if let Some(index) = finalizers
|
||||
.iter()
|
||||
.position(|x| *x == FIN_SECRET_LABEL.to_string())
|
||||
{
|
||||
finalizers.remove(index);
|
||||
};
|
||||
}
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
return Err(S3InstanceError::KubeError(err));
|
||||
}
|
||||
};
|
||||
}
|
||||
s3in.metadata.finalizers = Some(finalizers);
|
||||
};
|
||||
match s3_api
|
||||
.replace(&s3in.name_any(), &PostParams::default(), &s3in)
|
||||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
return Ok(Action::await_change());
|
||||
}
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
return Err(S3InstanceError::KubeError(err));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if is_condition_true(status.clone().conditions, TYPE_SECRET_LABELED) {
|
||||
let mut current_finalizers = match s3in.clone().metadata.finalizers {
|
||||
Some(finalizers) => finalizers,
|
||||
None => vec![],
|
||||
};
|
||||
|
||||
if !current_finalizers.contains(&FIN_SECRET_LABEL.to_string()) {
|
||||
info!("Adding a finalizer");
|
||||
current_finalizers.push(FIN_SECRET_LABEL.to_string());
|
||||
|
||||
s3in.metadata.finalizers = Some(current_finalizers);
|
||||
match s3_api
|
||||
.replace(&s3in.name_any(), &PostParams::default(), &s3in)
|
||||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
return Ok(Action::await_change());
|
||||
}
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
return Err(S3InstanceError::KubeError(err));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if is_condition_unknown(status.clone().conditions, TYPE_SECRET_LABELED) {
|
||||
if is_secret_labeled(secret.clone()) {
|
||||
if is_secret_labeled_by_another_obj(s3in.clone(), secret.clone()) {
|
||||
error!("{}", S3InstanceError::SecretIsAlreadyLabeled);
|
||||
return Err(S3InstanceError::SecretIsAlreadyLabeled);
|
||||
}
|
||||
if is_secret_labeled_by_obj(s3in.clone(), secret.clone()) {
|
||||
info!("Secret is already labeled");
|
||||
status.conditions = set_condition(
|
||||
status.clone().conditions,
|
||||
obj.metadata.clone(),
|
||||
TYPE_SECRET_LABELED,
|
||||
"True".to_string(),
|
||||
"Reconciled".to_string(),
|
||||
"Secret is already labeled".to_string(),
|
||||
);
|
||||
}
|
||||
} else {
|
||||
info!("Labeling the secret");
|
||||
if let Err(err) = label_secret(ctx.clone(), s3in.clone(), secret).await {
|
||||
error!("{}", err);
|
||||
return Err(S3InstanceError::KubeError(err));
|
||||
};
|
||||
status.conditions = set_condition(
|
||||
status.clone().conditions,
|
||||
obj.metadata.clone(),
|
||||
TYPE_SECRET_LABELED,
|
||||
"True".to_string(),
|
||||
"Reconciled".to_string(),
|
||||
"Secret is labeled".to_string(),
|
||||
);
|
||||
};
|
||||
s3in.status = Some(status);
|
||||
match s3_api
|
||||
.replace_status(&s3in.name_any(), &PostParams::default(), &s3in)
|
||||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
return Ok(Action::await_change());
|
||||
}
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
return Err(S3InstanceError::KubeError(err));
|
||||
}
|
||||
}
|
||||
};
|
||||
info!("Checking if the secret is labeled by another object");
|
||||
if !is_secret_labeled_by_obj(s3in.clone(), secret.clone()) {
|
||||
status.conditions = set_condition(
|
||||
status.conditions,
|
||||
s3in.clone().metadata,
|
||||
TYPE_SECRET_LABELED,
|
||||
"Unknown".to_string(),
|
||||
"S3InstanceReconciliation".to_string(),
|
||||
"Secret is not labeled".to_string(),
|
||||
);
|
||||
s3in.status = Some(status);
|
||||
match s3_api
|
||||
.replace_status(&s3in.clone().name_any(), &PostParams::default(), &s3in)
|
||||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
return Ok(Action::await_change());
|
||||
}
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
return Err(S3InstanceError::KubeError(err));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
info!("Getting data from the secret");
|
||||
// Getting data from the secret to initialize the clinet
|
||||
let data = match secret.data {
|
||||
Some(data) => data,
|
||||
None => {
|
||||
let err = anyhow::Error::msg("empty data");
|
||||
error!("{}", err);
|
||||
return Err(S3InstanceError::InvalidSecret(err));
|
||||
}
|
||||
};
|
||||
|
||||
let access_key = match data.get(ACCESS_KEY) {
|
||||
Some(access_key) => String::from_utf8(access_key.0.clone()).unwrap(),
|
||||
None => {
|
||||
let err = anyhow::Error::msg("empty access key");
|
||||
error!("{}", err);
|
||||
return Err(S3InstanceError::InvalidSecret(err));
|
||||
}
|
||||
};
|
||||
let secret_key = match data.get(SECRET_KEY) {
|
||||
Some(secret_key) => String::from_utf8(secret_key.0.clone()).unwrap(),
|
||||
None => {
|
||||
let err = anyhow::Error::msg("empty secret key");
|
||||
error!("{}", err);
|
||||
return Err(S3InstanceError::InvalidSecret(err));
|
||||
}
|
||||
};
|
||||
|
||||
info!("Creating an s3 client");
|
||||
let s3_client = S3Api::new(
|
||||
access_key,
|
||||
secret_key,
|
||||
s3in.clone().spec.endpoint.to_string(),
|
||||
s3in.clone().spec.region.to_string(),
|
||||
s3in.clone().spec.force_path_style,
|
||||
)
|
||||
.await;
|
||||
|
||||
info!("Getting buckets");
|
||||
let buckets = match s3_client.list_buckets().await {
|
||||
Ok(buckets) => buckets,
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
return Err(S3InstanceError::IllegalS3Instance);
|
||||
}
|
||||
};
|
||||
|
||||
status.ready = true;
|
||||
status.buckets = Some(buckets.clone());
|
||||
status.total_buckets = Some(buckets.len());
|
||||
s3in.status = Some(status);
|
||||
|
||||
info!("Updating status of the s3in resource");
|
||||
match s3_api
|
||||
.replace_status(&s3in.name_any(), &PostParams::default(), &s3in)
|
||||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
return Ok(Action::requeue(Duration::from_secs(120)));
|
||||
}
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
return Err(S3InstanceError::IllegalS3Instance);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// Bootstrap the object by adding a default status to it
|
||||
async fn init_object(mut obj: S3Instance, api: Api<S3Instance>) -> Result<Action, S3InstanceError> {
|
||||
let mut conditions = set_condition(
|
||||
vec![],
|
||||
obj.metadata.clone(),
|
||||
TYPE_CONNECTED,
|
||||
"Unknown".to_string(),
|
||||
"Reconciling".to_string(),
|
||||
"Reconciliation started".to_string(),
|
||||
);
|
||||
conditions = set_condition(
|
||||
conditions,
|
||||
obj.metadata.clone(),
|
||||
TYPE_SECRET_LABELED,
|
||||
"Unknown".to_string(),
|
||||
"Reconciling".to_string(),
|
||||
"Reconciliation started".to_string(),
|
||||
);
|
||||
let ready = false;
|
||||
let buckets = None;
|
||||
let total_buckets = None;
|
||||
obj.status = Some(S3InstanceStatus {
|
||||
ready,
|
||||
conditions,
|
||||
buckets,
|
||||
total_buckets,
|
||||
});
|
||||
match api
|
||||
.replace_status(obj.clone().name_any().as_str(), &Default::default(), &obj)
|
||||
.await
|
||||
{
|
||||
Ok(_) => Ok(Action::await_change()),
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
Err(S3InstanceError::KubeError(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get the secret with credentials
|
||||
pub(crate) async fn get_secret(api: Api<Secret>, obj: S3Instance) -> Result<Secret, kube::Error> {
|
||||
let secret = match api.get(&obj.spec.credentials_secret.name).await {
|
||||
Ok(secret) => secret,
|
||||
Err(err) => {
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
|
||||
Ok(secret)
|
||||
}
|
||||
|
||||
async fn unlabel_secret(
|
||||
ctx: Arc<Context>,
|
||||
obj: S3Instance,
|
||||
mut secret: Secret,
|
||||
) -> Result<(), kube::Error> {
|
||||
let secret_ns = obj.clone().spec.credentials_secret.namespace;
|
||||
let api: Api<Secret> = Api::namespaced(ctx.client.clone(), &secret_ns);
|
||||
if let Some(mut labels) = secret.clone().metadata.labels {
|
||||
labels.remove(&SECRET_LABEL.to_string());
|
||||
secret.metadata.labels = Some(labels);
|
||||
api.replace(&secret.name_any(), &PostParams::default(), &secret)
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
async fn label_secret(
|
||||
ctx: Arc<Context>,
|
||||
obj: S3Instance,
|
||||
mut secret: Secret,
|
||||
) -> Result<Secret, kube::Error> {
|
||||
let secret_ns = obj.clone().spec.credentials_secret.namespace;
|
||||
let api: Api<Secret> = Api::namespaced(ctx.client.clone(), &secret_ns);
|
||||
|
||||
secret
|
||||
.clone()
|
||||
.metadata
|
||||
.labels
|
||||
.get_or_insert_with(BTreeMap::new)
|
||||
.insert(SECRET_LABEL.to_string(), obj.name_any());
|
||||
|
||||
let mut labels = match &secret.clone().metadata.labels {
|
||||
Some(labels) => labels.clone(),
|
||||
None => {
|
||||
let map: BTreeMap<String, String> = BTreeMap::new();
|
||||
map
|
||||
}
|
||||
};
|
||||
labels.insert(SECRET_LABEL.to_string(), obj.name_any());
|
||||
secret.metadata.labels = Some(labels);
|
||||
api.replace(&secret.name_any(), &PostParams::default(), &secret)
|
||||
.await?;
|
||||
|
||||
let secret = match api.get(&obj.spec.credentials_secret.name).await {
|
||||
Ok(secret) => secret,
|
||||
Err(err) => {
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
Ok(secret)
|
||||
}
|
||||
|
||||
// Checks whether a secret ia already labeled by the operator
|
||||
fn is_secret_labeled(secret: Secret) -> bool {
|
||||
match secret.metadata.labels {
|
||||
Some(labels) => labels.get_key_value(SECRET_LABEL).is_some(),
|
||||
None => false,
|
||||
}
|
||||
}
|
||||
|
||||
// Checks whether a secret is already labeled by another object
|
||||
fn is_secret_labeled_by_another_obj(obj: S3Instance, secret: Secret) -> bool {
|
||||
match secret.metadata.labels {
|
||||
Some(labels) => labels
|
||||
.get(SECRET_LABEL)
|
||||
.is_some_and(|label| label != &obj.name_any()),
|
||||
None => false,
|
||||
}
|
||||
}
|
||||
|
||||
// Checks whether a secret is already labeled by this object
|
||||
fn is_secret_labeled_by_obj(obj: S3Instance, secret: Secret) -> bool {
|
||||
match secret.metadata.labels {
|
||||
Some(labels) => labels
|
||||
.get(SECRET_LABEL)
|
||||
.is_some_and(|label| label == &obj.name_any()),
|
||||
None => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn error_policy(_: Arc<S3Instance>, _: &S3InstanceError, _: Arc<Context>) -> Action {
|
||||
Action::requeue(Duration::from_secs(5 * 60))
|
||||
}
|
||||
|
||||
#[instrument(skip(client), fields(trace_id))]
|
||||
pub async fn run(client: Client) {
|
||||
let s3instances = Api::<S3Instance>::all(client.clone());
|
||||
if let Err(err) = s3instances.list(&ListParams::default().limit(1)).await {
|
||||
error!("{}", err);
|
||||
std::process::exit(1);
|
||||
}
|
||||
let recorder = Recorder::new(client.clone(), "s3instance-controller".into());
|
||||
let context = Context { client, recorder };
|
||||
Controller::new(s3instances, Config::default().any_semantic())
|
||||
.shutdown_on_signal()
|
||||
.run(reconcile, error_policy, Arc::new(context))
|
||||
.filter_map(|x| async move { std::result::Result::ok(x) })
|
||||
.for_each(|_| futures::future::ready(()))
|
||||
.await;
|
||||
}
|
||||
|
||||
// Context for our reconciler
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct Context {
|
||||
/// Kubernetes client
|
||||
pub client: Client,
|
||||
/// Event recorder
|
||||
pub recorder: Recorder,
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum S3InstanceError {
|
||||
#[error("SerializationError: {0}")]
|
||||
SerializationError(#[source] serde_json::Error),
|
||||
|
||||
#[error("Kube Error: {0}")]
|
||||
KubeError(#[source] kube::Error),
|
||||
|
||||
#[error("Finalizer Error: {0}")]
|
||||
// NB: awkward type because finalizer::Error embeds the reconciler error (which is this)
|
||||
// so boxing this error to break cycles
|
||||
FinalizerError(#[source] Box<kube::runtime::finalizer::Error<S3InstanceError>>),
|
||||
|
||||
#[error("IllegalS3Instance")]
|
||||
IllegalS3Instance,
|
||||
|
||||
#[error("SecretIsAlreadyLabeled")]
|
||||
SecretIsAlreadyLabeled,
|
||||
|
||||
#[error("Invalid Secret: {0}")]
|
||||
InvalidSecret(#[source] anyhow::Error),
|
||||
}
|
||||
|
||||
pub type S3InstanceResult<T, E = S3InstanceError> = std::result::Result<T, E>;
|
||||
14
operator/src/crdgen.rs
Normal file
14
operator/src/crdgen.rs
Normal file
@@ -0,0 +1,14 @@
|
||||
use api::api::v1beta1::s3_bucket_user::S3BucketUser;
|
||||
use api::api::v1beta1::{s3_bucket::S3Bucket, s3_instance::S3Instance};
|
||||
use kube::CustomResourceExt;
|
||||
fn main() {
|
||||
println!(
|
||||
"---\n{}",
|
||||
serde_yaml::to_string(&S3Instance::crd()).unwrap()
|
||||
);
|
||||
println!("---\n{}", serde_yaml::to_string(&S3Bucket::crd()).unwrap());
|
||||
println!(
|
||||
"---\n{}",
|
||||
serde_yaml::to_string(&S3BucketUser::crd()).unwrap()
|
||||
);
|
||||
}
|
||||
1
operator/src/lib.rs
Normal file
1
operator/src/lib.rs
Normal file
@@ -0,0 +1 @@
|
||||
pub mod api;
|
||||
29
operator/src/providers/mod.rs
Normal file
29
operator/src/providers/mod.rs
Normal file
@@ -0,0 +1,29 @@
|
||||
use async_trait::async_trait;
|
||||
|
||||
use crate::providers::rustfs::RustFS;
|
||||
|
||||
pub(crate) mod rustfs;
|
||||
|
||||
pub(crate) enum SupportedProvider {
|
||||
RustFS(RustFS),
|
||||
}
|
||||
|
||||
pub(crate) trait ProviderAPI {
|
||||
fn create_user(&self, access_key: String, secret_key: String) -> Result<(), anyhow::Error>;
|
||||
fn update_user(&self) -> Result<(), anyhow::Error>;
|
||||
fn delete_user(&self) -> Result<(), anyhow::Error>;
|
||||
}
|
||||
|
||||
impl ProviderAPI for SupportedProvider {
|
||||
fn create_user(&self, access_key: String, secret_key: String) -> Result<(), anyhow::Error> {
|
||||
match self {
|
||||
SupportedProvider::RustFS(rust_fs) => rust_fs.create_user(access_key, secret_key),
|
||||
}
|
||||
}
|
||||
fn update_user(&self) -> Result<(), anyhow::Error> {
|
||||
todo!()
|
||||
}
|
||||
fn delete_user(&self) -> Result<(), anyhow::Error> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
73
operator/src/providers/rustfs.rs
Normal file
73
operator/src/providers/rustfs.rs
Normal file
@@ -0,0 +1,73 @@
|
||||
use std::process::Command;
|
||||
|
||||
use super::ProviderAPI;
|
||||
use async_trait::async_trait;
|
||||
use reqwest::Client;
|
||||
use tracing::info;
|
||||
|
||||
pub(crate) struct RustFS {
|
||||
username: String,
|
||||
password: String,
|
||||
endpoint: String,
|
||||
region: String,
|
||||
}
|
||||
|
||||
impl RustFS {
|
||||
pub(crate) fn new(
|
||||
username: String,
|
||||
password: String,
|
||||
endpoint: String,
|
||||
region: String,
|
||||
) -> Self {
|
||||
Self {
|
||||
username,
|
||||
region,
|
||||
password,
|
||||
endpoint,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ProviderAPI for RustFS {
|
||||
fn create_user(&self, access_key: String, secret_key: String) -> Result<(), anyhow::Error> {
|
||||
info!("Preparing an alias");
|
||||
let name = format!("{}-alias", access_key.clone());
|
||||
let output = Command::new("rc")
|
||||
.args([
|
||||
"alias",
|
||||
"set",
|
||||
&name,
|
||||
self.endpoint.as_str(),
|
||||
self.username.as_str(),
|
||||
self.password.as_str(),
|
||||
])
|
||||
.output()?;
|
||||
info!("{:?}", output);
|
||||
info!("Creating a user");
|
||||
let output = Command::new("rc")
|
||||
.args([
|
||||
"admin",
|
||||
"user",
|
||||
"add",
|
||||
&name,
|
||||
access_key.as_str(),
|
||||
secret_key.as_str(),
|
||||
])
|
||||
.output()?;
|
||||
info!("{:?}", output);
|
||||
info!("Removing the alias");
|
||||
let output = Command::new("rc")
|
||||
.args(["alias", "remove", &name])
|
||||
.output()?;
|
||||
info!("{:?}", output);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn update_user(&self) -> Result<(), anyhow::Error> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn delete_user(&self) -> Result<(), anyhow::Error> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
1
operator/src/s3/dummy.rs
Normal file
1
operator/src/s3/dummy.rs
Normal file
@@ -0,0 +1 @@
|
||||
|
||||
12
operator/src/s3/mod.rs
Normal file
12
operator/src/s3/mod.rs
Normal file
@@ -0,0 +1,12 @@
|
||||
use anyhow::Error;
|
||||
|
||||
pub(crate) mod dummy;
|
||||
pub(crate) mod s3api;
|
||||
|
||||
pub(crate) trait S3Client {
|
||||
async fn list_buckets(self) -> Result<Vec<String>, Error>;
|
||||
async fn create_bucket(self, bucket_name: String) -> Result<(), Error>;
|
||||
async fn delete_bucket(self, bucket_name: String) -> Result<(), Error>;
|
||||
async fn count_objects(self, bucket_name: String) -> Result<u64, Error>;
|
||||
async fn count_size(self, bucket_name: String) -> Result<u64, Error>;
|
||||
}
|
||||
168
operator/src/s3/s3api.rs
Normal file
168
operator/src/s3/s3api.rs
Normal file
@@ -0,0 +1,168 @@
|
||||
use aws_config::{BehaviorVersion, Region};
|
||||
use aws_credential_types::Credentials;
|
||||
use aws_sdk_s3::config::Builder;
|
||||
use aws_sdk_s3::types::{Delete, ObjectIdentifier};
|
||||
use aws_sdk_s3::{Client, config::SharedCredentialsProvider};
|
||||
|
||||
use crate::s3::S3Client;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct S3Api {
|
||||
client: aws_sdk_s3::Client,
|
||||
}
|
||||
|
||||
impl S3Api {
|
||||
pub(crate) async fn new(
|
||||
access_key: String,
|
||||
secret_key: String,
|
||||
endpoint: String,
|
||||
region: String,
|
||||
force_path_style: bool,
|
||||
) -> Self {
|
||||
let creds = Credentials::new(access_key, secret_key, None, None, "static");
|
||||
let config = aws_config::defaults(BehaviorVersion::latest())
|
||||
.credentials_provider(SharedCredentialsProvider::new(creds))
|
||||
.region(Region::new(region))
|
||||
.endpoint_url(endpoint)
|
||||
.load()
|
||||
.await;
|
||||
let conf = Builder::from(&config)
|
||||
.force_path_style(force_path_style)
|
||||
.build();
|
||||
let client = Client::from_conf(conf);
|
||||
Self { client }
|
||||
}
|
||||
}
|
||||
|
||||
impl S3Client for S3Api {
|
||||
async fn list_buckets(self) -> Result<Vec<String>, anyhow::Error> {
|
||||
let mut buckets = self.client.list_buckets().into_paginator().send();
|
||||
let mut result: Vec<String> = vec![];
|
||||
|
||||
match buckets.next().await {
|
||||
Some(output) => {
|
||||
match output {
|
||||
Ok(buckets_res) => buckets_res.buckets().iter().for_each(|bucket| {
|
||||
if let Some(name) = bucket.name() {
|
||||
result.push(name.to_string());
|
||||
}
|
||||
}),
|
||||
Err(err) => {
|
||||
return Err(err.into());
|
||||
}
|
||||
};
|
||||
}
|
||||
None => {
|
||||
return Ok(result);
|
||||
}
|
||||
};
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn create_bucket(self, bucket_name: String) -> Result<(), anyhow::Error> {
|
||||
match self.client.create_bucket().bucket(bucket_name).send().await {
|
||||
Ok(_) => Ok(()),
|
||||
Err(err) => Err(err.into()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn count_objects(self, bucket_name: String) -> Result<u64, anyhow::Error> {
|
||||
let mut total_count = 0u64;
|
||||
let mut continuation_token = None;
|
||||
loop {
|
||||
let resp = self
|
||||
.client
|
||||
.list_objects_v2()
|
||||
.bucket(bucket_name.clone())
|
||||
.set_continuation_token(continuation_token.clone())
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
if let Some(contents) = resp.contents {
|
||||
for _ in contents {
|
||||
total_count += 1;
|
||||
}
|
||||
}
|
||||
|
||||
if resp.is_truncated.unwrap() {
|
||||
continuation_token = resp.next_continuation_token;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(total_count)
|
||||
}
|
||||
|
||||
async fn count_size(self, bucket_name: String) -> Result<u64, anyhow::Error> {
|
||||
let mut total_size = 0u64;
|
||||
let mut continuation_token = None;
|
||||
loop {
|
||||
let resp = self
|
||||
.client
|
||||
.list_objects_v2()
|
||||
.bucket(bucket_name.clone())
|
||||
.set_continuation_token(continuation_token.clone())
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
if let Some(contents) = resp.contents {
|
||||
for obj in contents {
|
||||
total_size += obj.size.unwrap() as u64;
|
||||
}
|
||||
}
|
||||
|
||||
if resp.is_truncated.unwrap() {
|
||||
continuation_token = resp.next_continuation_token;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(total_size)
|
||||
}
|
||||
|
||||
async fn delete_bucket(self, bucket_name: String) -> Result<(), anyhow::Error> {
|
||||
let mut continuation_token = None;
|
||||
loop {
|
||||
// List objects in the bucket
|
||||
let resp = self
|
||||
.client
|
||||
.clone()
|
||||
.list_objects_v2()
|
||||
.bucket(bucket_name.clone())
|
||||
.set_continuation_token(continuation_token.clone())
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
let objects: Vec<ObjectIdentifier> = resp
|
||||
.contents
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.map(|obj| ObjectIdentifier::builder().key(obj.key.unwrap()).build())
|
||||
.collect::<Result<Vec<ObjectIdentifier>, _>>()?;
|
||||
|
||||
if !objects.is_empty() {
|
||||
// Delete objects in batch
|
||||
self.client
|
||||
.clone()
|
||||
.delete_objects()
|
||||
.bucket(bucket_name.clone())
|
||||
.delete(Delete::builder().set_objects(Some(objects)).build()?)
|
||||
.send()
|
||||
.await?;
|
||||
}
|
||||
|
||||
if resp.is_truncated.unwrap() {
|
||||
continuation_token = resp.next_continuation_token;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
self.client
|
||||
.clone()
|
||||
.delete_bucket()
|
||||
.bucket(bucket_name.clone())
|
||||
.send()
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user