storage_manager/backend/executor/
load_csv.rs

1///! This file is to test load CSV file without using Buffer Manager.
2use std::fs::File;
3use std::io::{self, BufRead, BufReader};
4
5use crate::catalog::types::Catalog;
6use crate::heap::insert_tuple;
7use crate::types::DataValue;
8
9pub fn load_csv(
10    catalog: &Catalog,
11    db_name: &str,
12    table_name: &str,
13    file: &mut File,
14    csv_path: &str,
15) -> io::Result<()> {
16    // --- 1. Fetch table schema from catalog ---
17    let db = catalog.databases.get(db_name).ok_or_else(|| {
18        io::Error::new(
19            io::ErrorKind::NotFound,
20            format!("Database '{}' not found", db_name),
21        )
22    })?;
23
24    let table = db.tables.get(table_name).ok_or_else(|| {
25        io::Error::new(
26            io::ErrorKind::NotFound,
27            format!("Table '{}' not found", table_name),
28        )
29    })?;
30
31    let columns = &table.columns;
32    if columns.is_empty() {
33        return Err(io::Error::new(
34            io::ErrorKind::InvalidData,
35            "Table has no columns",
36        ));
37    }
38
39    // --- 2. Open and read the CSV file ---
40    let csv_file = File::open(csv_path)?;
41    let reader = BufReader::new(csv_file);
42
43    let mut lines = reader.lines();
44
45    // Skip header line
46    if let Some(Ok(_header)) = lines.next() {}
47
48    // --- 3. Iterate through rows ---
49    let mut inserted = 0;
50    for (i, line) in lines.enumerate() {
51        let row = line?;
52        if row.trim().is_empty() {
53            continue;
54        }
55
56        let values: Vec<&str> = row.split(',').map(|v| v.trim()).collect();
57
58        if values.len() != columns.len() {
59            println!(
60                "Skipping row {}: expected {} columns, found {}",
61                i + 1,
62                columns.len(),
63                values.len()
64            );
65            continue;
66        }
67
68        // --- 4. Encode each field using the column's DataType ---
69        let mut tuple_bytes: Vec<u8> = Vec::new();
70        let mut row_ok = true;
71
72        for (val, col) in values.iter().zip(columns.iter()) {
73            match DataValue::parse_and_encode(&col.data_type, val) {
74                Ok(bytes) => tuple_bytes.extend_from_slice(&bytes),
75                Err(e) => {
76                    println!("Skipping row {}: column '{}' — {}", i + 1, col.name, e);
77                    row_ok = false;
78                    break;
79                }
80            }
81        }
82
83        if !row_ok {
84            continue;
85        }
86
87        // --- 5. Insert tuple into page system ---
88        if let Err(e) = insert_tuple(file, &tuple_bytes) {
89            println!("Failed to insert row {}: {}", i + 1, e);
90        } else {
91            inserted += 1;
92        }
93    }
94    println!("Total Number of rows inserted: {}", inserted);
95    Ok(())
96}