storage_manager/backend/executor/
load_csv.rs1use std::fs::File;
3use std::io::{self, BufRead, BufReader};
4
5use crate::catalog::types::Catalog;
6use crate::heap::insert_tuple;
7use crate::types::DataValue;
8
9pub fn load_csv(
10 catalog: &Catalog,
11 db_name: &str,
12 table_name: &str,
13 file: &mut File,
14 csv_path: &str,
15) -> io::Result<()> {
16 let db = catalog.databases.get(db_name).ok_or_else(|| {
18 io::Error::new(
19 io::ErrorKind::NotFound,
20 format!("Database '{}' not found", db_name),
21 )
22 })?;
23
24 let table = db.tables.get(table_name).ok_or_else(|| {
25 io::Error::new(
26 io::ErrorKind::NotFound,
27 format!("Table '{}' not found", table_name),
28 )
29 })?;
30
31 let columns = &table.columns;
32 if columns.is_empty() {
33 return Err(io::Error::new(
34 io::ErrorKind::InvalidData,
35 "Table has no columns",
36 ));
37 }
38
39 let csv_file = File::open(csv_path)?;
41 let reader = BufReader::new(csv_file);
42
43 let mut lines = reader.lines();
44
45 if let Some(Ok(_header)) = lines.next() {}
47
48 let mut inserted = 0;
50 for (i, line) in lines.enumerate() {
51 let row = line?;
52 if row.trim().is_empty() {
53 continue;
54 }
55
56 let values: Vec<&str> = row.split(',').map(|v| v.trim()).collect();
57
58 if values.len() != columns.len() {
59 println!(
60 "Skipping row {}: expected {} columns, found {}",
61 i + 1,
62 columns.len(),
63 values.len()
64 );
65 continue;
66 }
67
68 let mut tuple_bytes: Vec<u8> = Vec::new();
70 let mut row_ok = true;
71
72 for (val, col) in values.iter().zip(columns.iter()) {
73 match DataValue::parse_and_encode(&col.data_type, val) {
74 Ok(bytes) => tuple_bytes.extend_from_slice(&bytes),
75 Err(e) => {
76 println!("Skipping row {}: column '{}' — {}", i + 1, col.name, e);
77 row_ok = false;
78 break;
79 }
80 }
81 }
82
83 if !row_ok {
84 continue;
85 }
86
87 if let Err(e) = insert_tuple(file, &tuple_bytes) {
89 println!("Failed to insert row {}: {}", i + 1, e);
90 } else {
91 inserted += 1;
92 }
93 }
94 println!("Total Number of rows inserted: {}", inserted);
95 Ok(())
96}