first commit
This commit is contained in:
1
consumer/.gitignore
vendored
Normal file
1
consumer/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
target/
|
||||
2612
consumer/Cargo.lock
generated
Normal file
2612
consumer/Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
24
consumer/Cargo.toml
Normal file
24
consumer/Cargo.toml
Normal file
@@ -0,0 +1,24 @@
|
||||
[package]
|
||||
name = "atlas_consumer"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[lib]
|
||||
name = "atlas_message"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[dependencies]
|
||||
ring = "0.17.14"
|
||||
rand = "0.9.2"
|
||||
serde = { version = "1.0.228", features = ["derive"]}
|
||||
serde_json = "1.0.145"
|
||||
base64 = "0.22.1"
|
||||
hex = "0.4.3"
|
||||
tokio = { version = "1.32.0", features = ["full"] }
|
||||
redis = { version = "0.32.7", features = ["aio", "connection-manager", "tokio-comp"] }
|
||||
tracing = "0.1.41"
|
||||
tracing-subscriber = { version = "0.3.19", features = ["env-filter"]}
|
||||
regex = "1.11"
|
||||
chrono = { version = "0.4.42", features = ["serde"] }
|
||||
lapin = { version = "3.7.2" }
|
||||
futures = "0.3.31"
|
||||
30
consumer/Dockerfile
Normal file
30
consumer/Dockerfile
Normal file
@@ -0,0 +1,30 @@
|
||||
#FROM lukemathwalker/cargo-chef:latest-rust-1.91.1-alpine3.22 AS chef
|
||||
#FROM rust:1.91.1-alpine3.22 AS chef
|
||||
FROM clux/muslrust:stable AS chef
|
||||
USER root
|
||||
RUN cargo install cargo-chef
|
||||
WORKDIR /app
|
||||
|
||||
FROM chef AS planner
|
||||
COPY ./Cargo.toml ./
|
||||
COPY ./Cargo.lock ./
|
||||
COPY ./src ./src
|
||||
ENV RUST_BACKTRACE=1
|
||||
RUN cargo chef prepare --recipe-path recipe.json
|
||||
|
||||
FROM chef AS builder
|
||||
COPY --from=planner /app/recipe.json recipe.json
|
||||
ENV RUST_BACKTRACE=1
|
||||
RUN cargo chef cook --release --target x86_64-unknown-linux-musl --recipe-path recipe.json
|
||||
COPY . .
|
||||
RUN cargo build --release --target x86_64-unknown-linux-musl --bin atlas_consumer
|
||||
|
||||
FROM alpine:3.22 AS runtime
|
||||
RUN addgroup -S station && adduser -S station -G station
|
||||
WORKDIR /app
|
||||
COPY --from=builder /app/target/x86_64-unknown-linux-musl/release/atlas_consumer /usr/local/bin/
|
||||
#RUN chown station: /app/atlas_producer && chmod u+x /usr/local/bin/atlas_producer
|
||||
RUN chown -R station: /app
|
||||
RUN chmod og+rx /usr/local/bin/atlas_consumer
|
||||
USER station
|
||||
ENTRYPOINT ["/usr/local/bin/atlas_consumer"]
|
||||
2
consumer/consumer.env
Normal file
2
consumer/consumer.env
Normal file
@@ -0,0 +1,2 @@
|
||||
RUST_BACKTRACE=1
|
||||
LOG_LEVEL=debug
|
||||
167
consumer/src/lib.rs
Normal file
167
consumer/src/lib.rs
Normal file
@@ -0,0 +1,167 @@
|
||||
use ring::{
|
||||
signature::{self, KeyPair, Ed25519KeyPair},
|
||||
digest::{self, SHA256},
|
||||
error::Unspecified,
|
||||
rand::SystemRandom
|
||||
};
|
||||
use serde::{Serialize, Deserialize};
|
||||
use chrono::{self, Utc, DateTime};
|
||||
use serde_json::Value;
|
||||
use base64::{engine::general_purpose, Engine as _};
|
||||
use hex;
|
||||
use redis::{ AsyncCommands, Client, aio::ConnectionManager as ConnectionManager};
|
||||
//use tracing::{self, event, Level};
|
||||
const LOGGING_ID_LENGTH: usize = 16;
|
||||
const KEY_REDIS_PREFIX: &str = "producer: key_record:";
|
||||
//mod atlas_message;
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct MessagePayload {
|
||||
pub user_id: u64,
|
||||
pub data: String,
|
||||
pub timestamp: DateTime<Utc>,
|
||||
}
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct SignedMessage {
|
||||
pub header: Header,
|
||||
pub payload: Value,
|
||||
}
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct Header {
|
||||
pub producer_id: String,
|
||||
pub alg: String,
|
||||
pub signature: String,
|
||||
}
|
||||
//error handling
|
||||
#[derive(Debug)]
|
||||
pub enum SecurityError {
|
||||
SignatureError(Unspecified),
|
||||
KeyIntegretyError,
|
||||
JsonError(serde_json::Error),
|
||||
Base64Error(base64::DecodeError),
|
||||
HexError(hex::FromHexError),
|
||||
RedisError(redis::RedisError),
|
||||
KeyNotFound,
|
||||
}
|
||||
|
||||
impl From<Unspecified> for SecurityError {
|
||||
fn from(err: Unspecified) -> Self {
|
||||
SecurityError::SignatureError(err)
|
||||
}
|
||||
}
|
||||
impl From<serde_json::Error> for SecurityError {
|
||||
fn from(err: serde_json::Error) -> Self {
|
||||
SecurityError::JsonError(err)
|
||||
}
|
||||
}
|
||||
impl From<base64::DecodeError> for SecurityError {
|
||||
fn from(err: base64::DecodeError) -> Self {
|
||||
SecurityError::Base64Error(err)
|
||||
}
|
||||
}
|
||||
impl From<hex::FromHexError> for SecurityError {
|
||||
fn from(err: hex::FromHexError) -> Self {
|
||||
SecurityError::HexError(err)
|
||||
}
|
||||
}
|
||||
impl From<redis::RedisError> for SecurityError {
|
||||
fn from(err: redis::RedisError) -> Self {
|
||||
SecurityError::RedisError(err)
|
||||
}
|
||||
}
|
||||
//redis connection
|
||||
async fn create_keyserver_connection(redis_url: &str) -> Result<ConnectionManager, SecurityError> {
|
||||
let client = Client::open(redis_url)?;
|
||||
let connection = client.get_connection_manager().await?;
|
||||
//event!(Level::INFO,
|
||||
println!("Connected to Redis at {}", redis_url);
|
||||
Ok(connection)
|
||||
}
|
||||
pub async fn get_producer_key(
|
||||
redis_url: &str,
|
||||
fingerprint: &str,
|
||||
) -> Result<Vec<u8>, SecurityError> {
|
||||
let key_redis_key = format!("{}{}", KEY_REDIS_PREFIX, fingerprint);
|
||||
let mut keyserver_conn = create_keyserver_connection(redis_url).await?;
|
||||
let key_bytes: Option<Vec<u8>> = keyserver_conn.hget(&key_redis_key, "key_bytes").await?;
|
||||
match key_bytes {
|
||||
Some(bytes) => Ok(bytes),
|
||||
None => Err(SecurityError::KeyNotFound),
|
||||
}
|
||||
}
|
||||
//producer check and store
|
||||
//#[instrument(skip(redis_url, fingerprint, public_key_bytes))]
|
||||
pub async fn store_producer_key(
|
||||
redis_url: &str,
|
||||
fingerprint: &str,
|
||||
public_key_bytes: &[u8],
|
||||
) -> Result<(), SecurityError> {
|
||||
println!("starting connection to keyserver...");
|
||||
let key_redis_key = format!("{}{}", KEY_REDIS_PREFIX, fingerprint);
|
||||
let mut keyserver_conn = create_keyserver_connection(redis_url).await?;
|
||||
println!("checking for existing producer key...");
|
||||
let set_result: bool = keyserver_conn.hset_nx(&key_redis_key, "key_bytes", &public_key_bytes).await?;
|
||||
if set_result == false {
|
||||
println!("Producer key already exists in Redis for ID: {}", fingerprint);
|
||||
return Ok(());
|
||||
}
|
||||
println!("Stored new producer key in Redis for ID: {}", fingerprint);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
//key pair generation
|
||||
pub fn generate_ed25519_keypair() -> Result<(Ed25519KeyPair,Vec<u8>), SecurityError> {
|
||||
let rng = SystemRandom::new();
|
||||
let pkcs8_bytes = Ed25519KeyPair::generate_pkcs8(&rng)?;
|
||||
let key_pair = Ed25519KeyPair::from_pkcs8(pkcs8_bytes.as_ref())
|
||||
.map_err(|_| SecurityError::KeyIntegretyError)?;
|
||||
let public_key_bytes = key_pair.public_key().as_ref().to_vec();
|
||||
Ok((key_pair, public_key_bytes))
|
||||
}
|
||||
|
||||
//message signature features
|
||||
pub fn generate_fingerprint(public_key_bytes: &[u8]) -> String {
|
||||
let digest = digest::digest(&SHA256, public_key_bytes);
|
||||
let fingerprint_bytes = &digest.as_ref();
|
||||
hex::encode(fingerprint_bytes)
|
||||
}
|
||||
pub fn get_log_id(full_fingerprint: &str) -> String {
|
||||
let len = full_fingerprint.len().min(LOGGING_ID_LENGTH);
|
||||
full_fingerprint[..len].to_string()
|
||||
}
|
||||
|
||||
//canonicalization of payload
|
||||
pub fn canonicalize_payload(payload: &Value) -> Result<Vec<u8>, serde_json::Error> {
|
||||
serde_json::to_string(payload).map(|s| s.into_bytes())
|
||||
}
|
||||
//message signing serde::Value means {}
|
||||
pub fn sign_payload(key_pair: &Ed25519KeyPair, payload: &Value) -> Result<String, SecurityError> {
|
||||
let canonical_payload = canonicalize_payload(payload)?;
|
||||
let signature = key_pair.sign(&canonical_payload);
|
||||
let signature_b64 = general_purpose::STANDARD.encode(signature.as_ref());
|
||||
Ok(signature_b64)
|
||||
}
|
||||
//message verification
|
||||
pub fn verify_message_signature(
|
||||
public_key_bytes: &[u8],
|
||||
payload: &Value,
|
||||
signature_b64: &str,
|
||||
) -> Result<bool, SecurityError> {
|
||||
let canonical_payload = canonicalize_payload(payload)?;
|
||||
let signature_bytes = general_purpose::STANDARD.decode(signature_b64)?;
|
||||
let public_key = signature::UnparsedPublicKey::new(&signature::ED25519, public_key_bytes);
|
||||
match public_key.verify(&canonical_payload, &signature_bytes) {
|
||||
Ok(_) => Ok(true),
|
||||
Err(_) => Ok(false),
|
||||
}
|
||||
}
|
||||
//identity verification
|
||||
pub fn verify_message_identity(
|
||||
message_producer_id: &str,
|
||||
public_key_bytes: &[u8],
|
||||
) -> Result<bool, SecurityError> {
|
||||
let digest = digest::digest(&SHA256, public_key_bytes);
|
||||
let fingerprint_bytes = &digest.as_ref();
|
||||
let computed_fingerprint = hex::encode(fingerprint_bytes);
|
||||
Ok(computed_fingerprint == message_producer_id)
|
||||
}
|
||||
96
consumer/src/main.rs
Normal file
96
consumer/src/main.rs
Normal file
@@ -0,0 +1,96 @@
|
||||
//se serde::{self, Serialize, Deserialize};
|
||||
//use serde_json::Value;
|
||||
//use chrono::{self, Utc, DateTime};
|
||||
use lapin::{
|
||||
Channel, Connection, ConnectionProperties,
|
||||
Consumer, Error as LapinError, message::Delivery, options::*,
|
||||
types::FieldTable};
|
||||
use futures::{stream::StreamExt};
|
||||
use atlas_message::{
|
||||
MessagePayload, SignedMessage, get_producer_key, verify_message_identity, verify_message_signature};
|
||||
|
||||
const RABBITMQ_URL: &str = "amqp://guest:guest@rabbitmq:5672/%2f";
|
||||
const EXCHANGE_NAME: &str = "signed_exchange";
|
||||
const ROUTING_KEY: &str = "secure.message";
|
||||
const QUEUE_NAME: &str = "signed_queue";
|
||||
|
||||
async fn setup_rabbitmq_channel() -> Result<Channel, LapinError> {
|
||||
let conn = Connection::connect(RABBITMQ_URL,
|
||||
ConnectionProperties::default()).await?;
|
||||
let channel = conn.create_channel().await?;
|
||||
channel.exchange_declare(
|
||||
EXCHANGE_NAME,
|
||||
lapin::ExchangeKind::Direct,
|
||||
ExchangeDeclareOptions {
|
||||
durable: true,
|
||||
..lapin::options::ExchangeDeclareOptions::default()
|
||||
},
|
||||
FieldTable::default(),
|
||||
).await?;
|
||||
println!("Exchange declared: {}", EXCHANGE_NAME);
|
||||
channel.queue_declare(
|
||||
QUEUE_NAME,
|
||||
QueueDeclareOptions {
|
||||
durable: true,
|
||||
..lapin::options::QueueDeclareOptions::default()
|
||||
},
|
||||
FieldTable::default(),
|
||||
).await?;
|
||||
println!("Queue declared: {}", QUEUE_NAME);
|
||||
channel.queue_bind(
|
||||
QUEUE_NAME,
|
||||
EXCHANGE_NAME,
|
||||
ROUTING_KEY,
|
||||
QueueBindOptions::default(),
|
||||
FieldTable::default(),
|
||||
).await?;
|
||||
println!("Queue bound: {}", QUEUE_NAME);
|
||||
Ok(channel)
|
||||
}
|
||||
|
||||
async fn rabbitmq_consumer(channel: &Channel) -> Result<(), LapinError> {
|
||||
let mut consumer = channel.basic_consume(
|
||||
QUEUE_NAME,
|
||||
"secure_consumer",
|
||||
BasicConsumeOptions::default(),
|
||||
FieldTable::default(),
|
||||
).await?;
|
||||
while let Some(delivery) = consumer.next().await {
|
||||
let delivery = delivery?;
|
||||
let message_str = std::str::from_utf8(&delivery.data).unwrap();
|
||||
println!("Received message: {}", message_str);
|
||||
let received_message: SignedMessage = serde_json::from_str(message_str).unwrap();
|
||||
let r_signed_payload = &received_message.payload;
|
||||
let r_message_signature = &received_message.header.signature;
|
||||
let r_message_producer_id = &received_message.header.producer_id;
|
||||
let retrieved_key = get_producer_key("redis://keyserver/", &r_message_producer_id).await.unwrap();
|
||||
|
||||
let r_result_verify_sig = verify_message_signature(&retrieved_key,
|
||||
r_signed_payload, r_message_signature).unwrap();
|
||||
if r_result_verify_sig == true {
|
||||
println!("Received message signature verification succeeded");
|
||||
} else {
|
||||
println!("Received message signature verification FAILED");
|
||||
}
|
||||
let r_result_verify_producer = verify_message_identity(
|
||||
r_message_producer_id, &retrieved_key).unwrap();
|
||||
if r_result_verify_producer == true {
|
||||
println!("Received message producer identity verification succeeded");
|
||||
let plain_payload: MessagePayload = serde_json::from_value(
|
||||
received_message.payload).unwrap();
|
||||
println!("Processing authenicated payload: {:?}", &plain_payload);
|
||||
} else {
|
||||
println!("Received message producer identity verification FAILED");
|
||||
}
|
||||
delivery.ack(BasicAckOptions::default()).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
println!("Starting secure RabbitMQ consumer...");
|
||||
let channel = setup_rabbitmq_channel().await.expect("Failed to set up RabbitMQ channel");
|
||||
rabbitmq_consumer(&channel).await.expect("Failed to consume messages");
|
||||
}
|
||||
Reference in New Issue
Block a user