use std::{collections::VecDeque, net::SocketAddr, sync::Arc}; use bytes::{Buf, BytesMut}; use potato_protocol::{ datatypes::{identifier::Identifier, pack::Pack, var_int::VarInt}, packet::{ Packet, RawPacket, clientbound::{ self, GameEventPacket, SetChunkCacheCenterPacket, registry_data::{ Biome, BiomeEffects, DimensionType, PaintingVariant, RegistryData, RegistryDataEntry, WolfVariant, }, status_response, }, serverbound, }, packet_encodable::{Json, Nbt, PacketDecodeError, PacketEncodable, PacketEncodeError}, }; use thiserror::Error; use tokio::{ io::AsyncReadExt, net::tcp::OwnedReadHalf, sync::{Mutex, RwLock, mpsc::Sender}, }; use crate::server::Registries; // Max packet size is 2MB in vanilla const RECV_BUFFER_SIZE: usize = 1024 * 1024 * 2; pub struct Client { address: SocketAddr, packet_sender: Sender, readable_stream: Mutex, state: RwLock, recv_buffer: Mutex, packet_queue: Mutex>, } #[derive(Debug, Eq, PartialEq, Clone, Copy)] pub enum ConnectionState { Handshaking, Status, Login, Configuration, Play, } #[allow(clippy::enum_variant_names)] #[derive(Error, Debug)] pub enum ConnectionError { #[error("IO error while reading packet: {0}")] IoError(#[from] std::io::Error), #[error("Error while decoding packet: {0}")] DecodeError(#[from] PacketDecodeError), #[error("Error while encoding packet: {0}")] EncodeError(#[from] PacketEncodeError), #[error("Client provided invalid next state")] InvalidNextState, #[error("Finished")] Finished, } impl Client { pub fn new( packet_sender: Sender, readable_stream: OwnedReadHalf, ) -> Result { Ok(Client { address: readable_stream.peer_addr()?, packet_sender, readable_stream: Mutex::new(readable_stream), state: RwLock::new(ConnectionState::Handshaking), recv_buffer: Mutex::new(BytesMut::new()), packet_queue: Mutex::new(VecDeque::new()), }) } pub async fn needs_keep_alive(&self) -> bool { let state = self.state.read().await; *state == ConnectionState::Configuration || *state == ConnectionState::Play } pub async fn read_packets(&self) -> Result<(), ConnectionError> { { // NOTE: Deadlock waiting to happen, maybe there is a way to only lock once both are // available. let mut stream = self.readable_stream.lock().await; let mut recv_buffer = self.recv_buffer.lock().await; recv_buffer.reserve(RECV_BUFFER_SIZE); // TODO: Deal with errors let _ = stream.read_buf(&mut *recv_buffer).await; } // Try to read packets until we encounter an incomplete packet loop { match self.try_read_packet().await { Ok(raw_packet) => { let mut packet_queue = self.packet_queue.lock().await; packet_queue.push_back(raw_packet); } Err(ConnectionError::DecodeError(PacketDecodeError::Incomplete)) => { break; } Err(e) => return Err(e), } } Ok(()) } async fn try_read_packet(&self) -> Result { let mut recv_buffer = self.recv_buffer.lock().await; let length: usize = VarInt::decode_packet(&mut *recv_buffer)?.into(); if recv_buffer.remaining() < length { return Err(PacketDecodeError::Incomplete.into()); } let id = VarInt::decode_packet(&mut *recv_buffer)?; let data = recv_buffer.split_to(length - id.encoded_len()).freeze(); Ok(RawPacket { id, data }) } pub async fn send_packet(&self, packet: &T) -> Result<(), ConnectionError> { let state = { *self.state.read().await }; println!("[{} ({:?})] -> {:?}", self.address, state, packet); let mut buffer = BytesMut::new(); // Encode ID let packet_id: i32 = match state { ConnectionState::Handshaking => T::HANDSHAKE_ID, ConnectionState::Status => T::STATUS_ID, ConnectionState::Login => T::LOGIN_ID, ConnectionState::Configuration => T::CONFIGURATION_ID, ConnectionState::Play => T::PLAY_ID, }; assert!(packet_id != -1, "Packet ID can not be -1"); let packet_id: VarInt = packet_id.into(); packet.encode_packet(&mut buffer)?; match self .packet_sender .send(RawPacket { id: packet_id, data: buffer.freeze(), }) .await { Ok(_) => (), Err(e) => { println!("[{}] Error while sending packet: {}", self.address, e); return Err(ConnectionError::Finished); } } Ok(()) } pub async fn handle_packets( &self, registries: &Arc, ) -> Result<(), ConnectionError> { while let Some(raw_packet) = { let mut packet_queue = self.packet_queue.lock().await; packet_queue.pop_front() } { // TODO: Need to check that this gets dropped immediately let state = { *self.state.read().await }; match state { ConnectionState::Handshaking => { self.handle_handshaking(raw_packet).await?; } ConnectionState::Status => { self.handle_status(raw_packet).await?; } ConnectionState::Login => { self.handle_login(raw_packet).await?; } ConnectionState::Configuration => { self.handle_configuration(raw_packet, registries).await?; } ConnectionState::Play => { // TODO: Might not be able to handle this in the client, might need a full // player for that. // println!("Play packet on client. Ignoring for now") // self.handle_play(raw_packet)?; } } } Ok(()) } async fn handle_handshaking(&self, raw_packet: RawPacket) -> Result<(), ConnectionError> { match raw_packet.id.into() { serverbound::IntentionPacket::HANDSHAKE_ID => { let packet = serverbound::IntentionPacket::from_raw(raw_packet)?; println!("[{} (Handshaking)] <- {:?}", self.address, packet,); self.change_state_num(packet.next_state.into()).await? } // TODO: Legacy server list ping _ => { println!( "[{} (Handshaking)] <- Unknown packet: {}", self.address, raw_packet ); } } Ok(()) } async fn handle_status(&self, raw_packet: RawPacket) -> Result<(), ConnectionError> { match raw_packet.id.into() { serverbound::StatusRequestPacket::STATUS_ID => { let packet = serverbound::StatusRequestPacket; println!("[{} (Status)] <- {:?}", self.address, packet); self.send_packet(&clientbound::StatusResponsePacket { status: Json(status_response::StatusResponseData { version: status_response::Version { name: "Rust 1.21.4".to_owned(), protocol: 769, }, players: status_response::Players { max: 500, online: 0, sample: vec![], }, description: status_response::Description { text: "A rust server!".to_owned(), }, favicon: None, enforce_secure_chat: false, }), }) .await?; } serverbound::PingRequestPacket::STATUS_ID => { let packet = serverbound::PingRequestPacket::from_raw(raw_packet)?; println!("[{} (Status)] <- {:?}", self.address, packet); self.send_packet(&clientbound::PongResponsePacket { timestamp: packet.timestamp, }) .await?; // TODO: Signal to close connection // self.stream.shutdown(Shutdown::Both)?; return Err(ConnectionError::Finished); } _ => { println!( "[{} (Status)] <- Unknown packet: {}", self.address, raw_packet ); } } Ok(()) } async fn handle_login(&self, raw_packet: RawPacket) -> Result<(), ConnectionError> { match raw_packet.id.into() { serverbound::HelloPacket::LOGIN_ID => { let packet = serverbound::HelloPacket::from_raw(raw_packet)?; println!("[{} (Login)] <- {:?}", self.address, packet); self.send_packet(&clientbound::LoginFinishedPacket { uuid: packet.uuid, username: packet.name, properties: vec![], }) .await?; } serverbound::LoginAcknowledgedPacket::LOGIN_ID => { let packet = serverbound::LoginAcknowledgedPacket::from_raw(raw_packet)?; println!("[{} (Login)] <- {:?}", self.address, packet); self.change_state(ConnectionState::Configuration).await?; self.send_packet(&clientbound::SelectKnownPacksPacket { packs: vec![Pack { namespace: "minecraft".to_owned(), id: "core".to_owned(), version: "1.21.4".to_owned(), }], }) .await?; } _ => { println!( "[{} (Login)] <- Unknown packet: {}", self.address, raw_packet ); } } Ok(()) } async fn handle_configuration( &self, raw_packet: RawPacket, registries: &Arc, ) -> Result<(), ConnectionError> { match raw_packet.id.into() { serverbound::ClientInformationPacket::CONFIGURATION_ID => { let packet = serverbound::ClientInformationPacket::from_raw(raw_packet)?; println!("[{} (Configuration)] <- {:?}", self.address, packet); } serverbound::CustomPayloadPacket::CONFIGURATION_ID => { let packet = serverbound::CustomPayloadPacket::from_raw(raw_packet)?; println!("[{} (Configuration)] <- {:?}", self.address, packet); } serverbound::SelectKnownPacksPacket::CONFIGURATION_ID => { let packet = serverbound::SelectKnownPacksPacket::from_raw(raw_packet)?; println!("[{} (Configuration)] <- {:?}", self.address, packet); self.sync_registries(registries).await?; self.send_packet(&clientbound::FinishConfigurationPacket) .await?; } serverbound::FinishConfigurationPacket::CONFIGURATION_ID => { let packet = serverbound::finish_configuration::FinishConfigurationPacket; println!("[{} (Configuration)] <- {:?}", self.address, packet); self.change_state(ConnectionState::Play).await?; self.send_packet(&clientbound::LoginPacket { entity_id: 0, is_hardcore: false, dimension_names: vec![], max_players: 500.into(), view_distance: 10.into(), simulation_distance: 10.into(), reduced_debug_info: false, enable_respawn_screen: true, do_limited_crafting: false, dimension_type: 0.into(), dimension_name: Identifier::minecraft("overworld".to_owned()), hashed_seed: 0, game_mode: 1, previous_game_mode: -1, is_debug: false, is_flat: false, death_info: None, portal_cooldown: 0.into(), sea_level: 63.into(), enforces_secure_chat: false, }) .await?; self.send_packet(&GameEventPacket { event: 13, data: 0., }) .await?; self.send_packet(&SetChunkCacheCenterPacket { x: 0.into(), z: 0.into(), }) .await?; } serverbound::KeepAlivePacket::CONFIGURATION_ID => { let _ = serverbound::KeepAlivePacket::from_raw(raw_packet)?; } _ => { println!( "[{} (Configuration)] <- Unknown packet: {}", self.address, raw_packet ); } } Ok(()) } // fn handle_play(&mut self, length: usize) -> Result<(), ConnectionError> { // let cursor = &mut Cursor::new(&self.recv_buffer[..length]); // let id: i32 = VarInt::decode_packet(cursor)?.into(); // // match id { // serverbound::ClientTickEndPacket::PLAY_ID => { // serverbound::ClientTickEndPacket::decode_packet(cursor)?; // } // serverbound::MovePlayerPosPacket::PLAY_ID => { // let _packet = serverbound::MovePlayerPosPacket::decode_packet(cursor)?; // // println!("[{} (Play)] <- {:?}", self.address, packet); // } // serverbound::MovePlayerPosRotPacket::PLAY_ID => { // let _packet = serverbound::MovePlayerPosRotPacket::decode_packet(cursor)?; // // println!("[{} (Play)] <- {:?}", self.address, packet); // } // serverbound::MovePlayerRotPacket::PLAY_ID => { // let _packet = serverbound::MovePlayerRotPacket::decode_packet(cursor)?; // // println!("[{} (Play)] <- {:?}", self.address, packet); // } // _ => { // println!( // "[{} (Play)] <- Unknown packet id: {} with length {} data: {:X?}", // self.address, // id, // length, // &self.recv_buffer[..length] // ); // } // } // Ok(()) // } async fn sync_registries(&self, registries: &Arc) -> Result<(), ConnectionError> { self.send_packet(&clientbound::RegistryDataPacket { registry_id: Identifier::minecraft_str("dimension_type"), entries: vec![RegistryDataEntry { id: Identifier::minecraft_str("overworld"), data: Some(Nbt(RegistryData::DimensionType(DimensionType { fixed_time: None, has_skylight: 1, has_ceiling: 0, ultrawarm: 0, natural: 1, coordinate_scale: 1., bed_works: 1, respawn_anchor_works: 0, min_y: 0, height: 256, logical_height: 256, infiniburn: "#minecraft:infiniburn_overworld".to_owned(), effects: "minecraft:overworld".to_owned(), ambient_light: 0., piglin_safe: 1, has_raids: 1, monster_spawn_light_level: 0, monster_spawn_block_light_limit: 0, }))), }], }) .await?; self.send_packet(&clientbound::RegistryDataPacket { registry_id: Identifier::minecraft_str("painting_variant"), entries: vec![RegistryDataEntry { id: Identifier::minecraft_str("backyard"), data: Some(Nbt(RegistryData::PaintingVariant(PaintingVariant { asset_id: "minecraft:backyard".to_owned(), height: 2, width: 2, }))), }], }) .await?; self.send_packet(&clientbound::RegistryDataPacket { registry_id: Identifier::minecraft_str("wolf_variant"), entries: vec![RegistryDataEntry { id: Identifier::minecraft_str("ashen"), data: Some(Nbt(RegistryData::WolfVariant(WolfVariant { wild_texture: "minecraft:entity/wolf/wolf_ashen".to_owned(), tame_texture: "minecraft:entity/wolf/wolf_ashen_tame".to_owned(), angry_texture: "minecraft:entity/wolf/wolf_ashen_angry".to_owned(), biomes: vec![], }))), }], }) .await?; let damage_type_registry_entries = registries .damage_types .iter() .map(|(key, value)| RegistryDataEntry { id: key.clone(), data: Some(Nbt(RegistryData::DamegeType(value.clone().into()))), }) .collect(); self.send_packet(&clientbound::RegistryDataPacket { registry_id: Identifier::minecraft_str("damage_type"), entries: damage_type_registry_entries, }) .await?; self.send_packet(&clientbound::RegistryDataPacket { registry_id: Identifier::minecraft_str("worldgen/biome"), entries: vec![RegistryDataEntry { id: Identifier::minecraft_str("plains"), data: Some(Nbt(RegistryData::Biome(Biome { has_precipitation: 0, temperature: 0., temperature_modifier: None, downfall: 0., effects: BiomeEffects { fog_color: 0, water_color: 0, water_fog_color: 0, sky_color: 0, foliage_color: None, grass_color: None, grass_color_modifier: None, particle: None, ambient_sound: None, mood_sound: None, additions_sound: None, music: None, }, }))), }], }) .await?; Ok(()) } async fn change_state_num(&self, state: i32) -> Result<(), ConnectionError> { let new_state = match state { 1 => ConnectionState::Status, 2 => ConnectionState::Login, _ => return Err(ConnectionError::InvalidNextState), }; self.change_state(new_state).await } async fn change_state(&self, new_state: ConnectionState) -> Result<(), ConnectionError> { { let state = self.state.read().await; println!("[{}] {:?} -> {:?}", self.address, state, new_state); } *self.state.write().await = new_state; Ok(()) } }