DBの思案

#include <iostream>
#include <fstream>
#include <vector>
#include <string>
#include <cstdint>
#include <stdexcept>
#include <memory>
#include <immintrin.h>
#include <zlib.h>
#include <thread>
#include <mutex>
#include <list>
#include <map>
#include <set>
#include <shared_mutex>
#include <chrono>
#include <filesystem>
#include <algorithm>

enum class DataType
{
    BOOL, FLOAT, DOUBLE, INT, SHORT, BYTE, LONG, UINT, USHORT, UBYTE, ULONG,
    STRING, WSTRING, BLOB, DATETIME, DATE, TIME
};


struct Column
{
    std::string name;
    DataType type;
    size_t size;
    size_t offset;

    Column(const std::string& n, DataType t) : name(n), type(t), size(get_type_size(t)) {}
};

size_t get_type_size(DataType type) 
{
    switch (type)
    {
    case DataType::BOOL: return 1;
    case DataType::FLOAT: return 4;
    case DataType::DOUBLE: return 8;
    case DataType::INT: case DataType::UINT: return 4;
    case DataType::SHORT: case DataType::USHORT: return 2;
    case DataType::BYTE: case DataType::UBYTE: return 1;
    case DataType::LONG: case DataType::ULONG: return 8;
    case DataType::STRING: case DataType::WSTRING: case DataType::BLOB: return 4;
    case DataType::DATETIME: return 8;
    case DataType::DATE: return 4;
    case DataType::TIME: return 4;
    default: return 0;
    }
}


struct Row 
{
    std::vector<char> data;
    std::vector<char> compressed;
    bool dirty = false;
    bool is_compressed = false;
    size_t compressed_size = 0;
    bool use_rle = false;

    Row(size_t size) : data(size) {}
    void compress(bool prefer_rle = false) 
    {
        use_rle = prefer_rle && (data.size() > 100 && std::adjacent_find(data.begin(), data.end(), std::not_equal_to<>()) == data.end());
    
        if (use_rle)
        {
            compressed.clear();
            for (size_t i = 0; i < data.size();) 
            {
                uint8_t count = 1;
                while (i + count < data.size() && data[i] == data[i + count] && count < 255) count++;
                compressed.push_back(data[i]);
                compressed.push_back(count);
                i += count;
            }
        }
        else 
        {
            uLongf dest_len = compressBound(data.size());
            compressed.resize(dest_len);
            compress(reinterpret_cast<Bytef*>(compressed.data()), &dest_len,
                reinterpret_cast<const Bytef*>(data.data()), data.size());
            compressed.resize(dest_len);
        }
        compressed_size = compressed.size();
        is_compressed = true;
    }
    void decompress(size_t orig_size) 
    {
        data.resize(orig_size);
        if (use_rle) {
            size_t pos = 0;
            for (size_t i = 0; i < compressed.size() && pos < orig_size; i += 2) {
                char value = compressed[i];
                uint8_t count = compressed[i + 1];
                for (uint8_t j = 0; j < count && pos < orig_size; ++j) {
                    data[pos++] = value;
                }
            }
        }
        else {
            uLongf dest_len = orig_size;
            uncompress(reinterpret_cast<Bytef*>(data.data()), &dest_len,
                reinterpret_cast<const Bytef*>(compressed.data()), compressed.size());
        }
        is_compressed = false;
    }
};

class BTreeIndex {
    std::map<std::string, std::set<size_t>> index;
    std::shared_mutex mutex;
    std::string filename;

public:
    BTreeIndex(const std::string& fname) : filename(fname) {
        if (std::filesystem::exists(fname)) load();
    }
    void insert(const std::string& value, size_t row_idx) {
        std::unique_lock lock(mutex);
        index[value].insert(row_idx);
        save();
    }
    void remove(const std::string& value, size_t row_idx) {
        std::unique_lock lock(mutex);
        index[value].erase(row_idx);
        save();
    }
    std::vector<size_t> find(const std::string& value) const {
        std::shared_lock lock(mutex);
        auto it = index.find(value);
        if (it == index.end()) return {};
        return std::vector<size_t>(it->second.begin(), it->second.end());
    }
private:
    void save() {
        std::ofstream file(filename, std::ios::binary | std::ios::trunc);
        uint32_t size = index.size();
        file.write(reinterpret_cast<const char*>(&size), sizeof(size));
        for (const auto& [val, rows] : index) {
            uint32_t val_len = val.size();
            file.write(reinterpret_cast<const char*>(&val_len), sizeof(val_len));
            file.write(val.data(), val_len);
            uint32_t row_count = rows.size();
            file.write(reinterpret_cast<const char*>(&row_count), sizeof(row_count));
            for (auto row : rows) {
                file.write(reinterpret_cast<const char*>(&row), sizeof(row));
            }
        }
    }
    void load() {
        std::ifstream file(filename, std::ios::binary);
        if (!file) return;
        uint32_t size;
        file.read(reinterpret_cast<char*>(&size), sizeof(size));
        for (uint32_t i = 0; i < size; ++i) {
            uint32_t val_len;
            file.read(reinterpret_cast<char*>(&val_len), sizeof(val_len));
            std::string val(val_len, '\0');
            file.read(val.data(), val_len);
            uint32_t row_count;
            file.read(reinterpret_cast<char*>(&row_count), sizeof(row_count));
            std::set<size_t> rows;
            for (uint32_t j = 0; j < row_count; ++j) {
                size_t row;
                file.read(reinterpret_cast<char*>(&row), sizeof(row));
                rows.insert(row);
            }
            index[val] = rows;
        }
    }
};

class LRUCache 
{
    size_t max_size;
    std::unordered_map<size_t, std::unique_ptr<Row>> cache;
    std::list<size_t> lru_order;
    std::shared_mutex mutex;
    size_t hit_count = 0, miss_count = 0;

    void evict() 
    {
        std::unique_lock lock(mutex);
        if (cache.size() <= max_size) return;
        size_t oldest = lru_order.back();
        lru_order.pop_back();
        cache.erase(oldest);
    }

public:
    LRUCache(size_t size) : max_size(size) {}
    Row* get(size_t row_idx) 
    {
        std::shared_lock lock(mutex);
        auto it = cache.find(row_idx);
        if (it != cache.end()) 
        {
            hit_count++;
            lru_order.remove(row_idx);
            lru_order.push_front(row_idx);
            return it->second.get();
        }
        miss_count++;
        return nullptr;
    }
    void put(size_t row_idx, std::unique_ptr<Row> row) 
    {
        std::unique_lock lock(mutex);
        evict();
        lru_order.push_front(row_idx);
        cache[row_idx] = std::move(row);
    }
    void adjust_size(double hit_rate_threshold) 
    {
        std::shared_lock lock(mutex);
        double hit_rate = hit_count / static_cast<double>(hit_count + miss_count + 1);
        if (hit_rate < hit_rate_threshold) max_size += 10;
        else if (hit_rate > hit_rate_threshold + 0.1 && max_size > 50) max_size -= 5;
    }
    std::string stats()
    {
        std::shared_lock lock(mutex);
        return "Cache Hit: " + std::to_string(hit_count) + ", Miss: " + std::to_string(miss_count) +
            ", Size: " + std::to_string(max_size);
    }
};

class HDDUltimateTable 
{

public:

    std::vector<Column> columns;
    size_t row_size = 0;
    std::vector<uint64_t> row_offsets;
    std::list<std::pair<uint64_t, size_t>> free_list;
    std::vector<std::string> strings;
    uint32_t next_string_idx = 0;
    LRUCache cache{ 100 };
    std::map<size_t, BTreeIndex> indexes;

private:

    std::string filename;
    std::string shard_dir;
    std::mutex file_mutex;
    std::shared_mutex transaction_mutex;
    bool in_transaction = false;
    std::ofstream log_file;
    std::vector<std::pair<std::string, std::vector<char>>> log_entries;
    size_t log_size = 0;
    double hdd_rpm = 7200;

    uint32_t get_string_idx(const std::string& str, DataType type) 
    {
        std::unique_lock lock(file_mutex);
        for (uint32_t i = 0; i < strings.size(); ++i) 
        {
            if (strings[i] == str) return i;
        }
        strings.push_back(str);
        return next_string_idx++;
    }

    std::string get_shard_file(int shard_id)
    {
        return shard_dir + "/" + filename + "_shard" + std::to_string(shard_id) + ".bin";
    }

    void load_row(size_t row_idx) 
    {
        if (cache.get(row_idx)) return;

        int shard_id = row_idx % 2;
        std::ifstream file(get_shard_file(shard_id), std::ios::binary);
        if (!file) throw std::runtime_error("ファイルオープン失敗: " + get_shard_file(shard_id));
        file.seekg(row_offsets[row_idx]);
        auto row = std::make_unique<Row>(row_size);
        uint32_t comp_size;
        bool is_rle;
        file.read(reinterpret_cast<char*>(&comp_size), sizeof(comp_size));
        file.read(reinterpret_cast<char*>(&is_rle), sizeof(is_rle));
        row->compressed.resize(comp_size);
        file.read(row->compressed.data(), comp_size);
        row->decompress(row_size);
        row->use_rle = is_rle;
        cache.put(row_idx, std::move(row));
    }

    void prefetch_rows(size_t start_idx, size_t count) 
    {
        std::thread prefetch_thread([this, start_idx, count]() 
            {
            for (size_t i = start_idx; i < start_idx + count && i < row_offsets.size(); ++i)
            {
                if (!cache.get(i)) load_row(i);
            }
            });
        prefetch_thread.detach();
    }

    void write_file(bool checkpoint = false) {
        std::unique_lock lock(file_mutex);
        for (int shard_id = 0; shard_id < 2; ++shard_id) {
            std::ofstream file(get_shard_file(shard_id), std::ios::binary | std::ios::trunc);
            uint32_t num_rows = row_offsets.size() / 2 + (row_offsets.size() % 2 > shard_id);
            file.write(reinterpret_cast<const char*>(&num_rows), sizeof(num_rows));
            for (size_t i = shard_id; i < row_offsets.size(); i += 2) {
                file.write(reinterpret_cast<const char*>(&row_offsets[i]), sizeof(row_offsets[i]));
            }

            for (size_t i = shard_id; i < row_offsets.size(); i += 2) {
                Row* row = cache.get(i);
                if (row) {
                    if (row->dirty) {
                        bool use_rle = (hdd_rpm < 5400);
                        row->compress(use_rle);
                    }
                    file.seekp(row_offsets[i]);
                    file.write(reinterpret_cast<const char*>(&row->compressed_size), sizeof(row->compressed_size));
                    file.write(reinterpret_cast<const char*>(&row->use_rle), sizeof(row->use_rle));
                    file.write(row->compressed.data(), row->compressed_size);
                }
            }
        }
        if (checkpoint && in_transaction) {
            log_file.close();
            log_file.open(shard_dir + "/" + filename + ".log", std::ios::trunc);
            log_entries.clear();
            log_size = 0;
        }
    }

    void log_operation(const std::string& op, size_t row_idx = -1, const std::vector<std::string>& values = {}) {
        if (!in_transaction) return;
        std::stringstream log;
        log << op << " " << row_idx;
        std::vector<char> old_data;
        if (op == "SET" && row_idx != -1) {
            Row* row = cache.get(row_idx);
            if (row) old_data = row->data;
        }
        for (const auto& v : values) log << " " << v;
        std::unique_lock lock(file_mutex);
        log_entries.push_back({ log.str(), old_data });
        log_file << log.str() << "\n";
        log_size += log.str().size() + old_data.size();
        if (log_size > 1024 * 1024) write_file(true);
    }

public:
    HDDUltimateTable(const std::string& fname, const std::string& dir = ".", double rpm = 7200)
        : filename(fname), shard_dir(dir), hdd_rpm(rpm) {
        std::filesystem::create_directory(shard_dir);
        for (int shard_id = 0; shard_id < 2; ++shard_id) {
            std::ofstream file(get_shard_file(shard_id), std::ios::binary | std::ios::trunc);
            uint32_t num_rows = 0;
            file.write(reinterpret_cast<const char*>(&num_rows), sizeof(num_rows));
        }
        log_file.open(shard_dir + "/" + filename + ".log", std::ios::app);
        if (std::filesystem::exists(shard_dir + "/" + filename + ".log")) recover();
    }

    ~HDDUltimateTable() {
        if (log_file.is_open()) log_file.close();
    }

    void CreateTable() {
        std::unique_lock lock(transaction_mutex);
        if (in_transaction) throw std::runtime_error("トランザクション中はCREATE TABLE不可");
        columns.clear();
        row_size = 0;
        row_offsets.clear();
        free_list.clear();
        strings.clear();
        next_string_idx = 0;
        write_file();
    }

    void AddColumn(const std::string& name, DataType type) {
        std::unique_lock lock(transaction_mutex);
        Column col(name, type);
        col.offset = row_size;
        row_size += col.size;
        columns.push_back(col);
    }

    void begin_transaction() {
        std::unique_lock lock(transaction_mutex);
        if (in_transaction) throw std::runtime_error("既にトランザクション中");
        in_transaction = true;
        log_entries.clear();
    }

    void commit() {
        std::unique_lock lock(transaction_mutex);
        if (!in_transaction) throw std::runtime_error("トランザクションが開始されていません");
        in_transaction = false;
        write_file();
        log_entries.clear();
    }

    void rollback() {
        std::unique_lock lock(transaction_mutex);
        if (!in_transaction) throw std::runtime_error("トランザクションが開始されていません");
        in_transaction = false;
        for (auto it = log_entries.rbegin(); it != log_entries.rend(); ++it) {
            std::istringstream iss(it->first);
            std::string op;
            size_t row_idx;
            iss >> op >> row_idx;
            if (op == "INSERT") {
                cache.put(row_idx, nullptr);
                free_list.push_back({ row_offsets[row_idx], 0 });
            }
            else if (op == "DELETE") {
                load_row(row_idx);
            }
            else if (op == "SET") {
                size_t col_idx;
                std::string value;
                iss >> col_idx >> value;
                Row* row = cache.get(row_idx);
                if (row && !it->second.empty()) row->data = it->second;
            }
        }
        log_entries.clear();
        write_file();
    }

    void insert(const std::vector<std::string>& values) {
        std::shared_lock lock(transaction_mutex);
        auto row = std::make_unique<Row>(row_size);
        for (size_t i = 0; i < columns.size(); ++i) {
            char* ptr = row->data.data() + columns[i].offset;
            switch (columns[i].type) {
            case DataType::BOOL: *ptr = std::stoi(values[i]) != 0; break;
            case DataType::FLOAT: *(float*)ptr = std::stof(values[i]); break;
            case DataType::DOUBLE: *(double*)ptr = std::stod(values[i]); break;
            case DataType::INT: *(int32_t*)ptr = std::stoi(values[i]); break;
            case DataType::SHORT: *(int16_t*)ptr = std::stoi(values[i]); break;
            case DataType::BYTE: *(int8_t*)ptr = std::stoi(values[i]); break;
            case DataType::LONG: *(int64_t*)ptr = std::stoll(values[i]); break;
            case DataType::UINT: *(uint32_t*)ptr = std::stoul(values[i]); break;
            case DataType::USHORT: *(uint16_t*)ptr = std::stoul(values[i]); break;
            case DataType::UBYTE: *(uint8_t*)ptr = std::stoul(values[i]); break;
            case DataType::ULONG: *(uint64_t*)ptr = std::stoull(values[i]); break;
            case DataType::STRING: case DataType::WSTRING: case DataType::BLOB:
                *(uint32_t*)ptr = get_string_idx(values[i], columns[i].type); break;
            case DataType::DATETIME: *(int64_t*)ptr = std::stoll(values[i]); break;
            case DataType::DATE: *(int32_t*)ptr = std::stoi(values[i]); break;
            case DataType::TIME: *(int32_t*)ptr = std::stoi(values[i]); break;
            }
            if (indexes.count(i)) indexes[i].insert(values[i], row_offsets.size());
        }
        row->dirty = true;

        uint64_t offset;
        size_t row_idx;
        if (!free_list.empty()) {
            auto it = free_list.begin();
            while (it != free_list.end() && it->second < row->compressed_size) ++it;
            if (it != free_list.end()) {
                offset = it->first;
                row_idx = std::distance(row_offsets.begin(), std::find(row_offsets.begin(), row_offsets.end(), offset));
                free_list.erase(it);
            }
            else {
                offset = sizeof(uint32_t) + row_offsets.size() * sizeof(uint64_t) + row_offsets.size() * (row_size + sizeof(uint32_t) + sizeof(bool));
                row_idx = row_offsets.size();
                row_offsets.push_back(offset);
            }
        }
        else {
            offset = sizeof(uint32_t) + row_offsets.size() * sizeof(uint64_t) + row_offsets.size() * (row_size + sizeof(uint32_t) + sizeof(bool));
            row_idx = row_offsets.size();
            row_offsets.push_back(offset);
        }
        cache.put(row_idx, std::move(row));
        log_operation("INSERT", row_idx, values);
    }

    std::vector<size_t> search_simd(size_t col_idx, const std::string& target, bool use_index = true) {
        std::shared_lock lock(transaction_mutex);
        if (use_index && indexes.count(col_idx)) {
            return indexes[col_idx].find(target);
        }

        std::vector<size_t> results;
        if (columns[col_idx].type != DataType::INT) return results;

        int32_t target_val = std::stoi(target);
        constexpr size_t SIMD_WIDTH = 8;
        prefetch_rows(0, std::min<size_t>(SIMD_WIDTH * 2, row_offsets.size()));

        for (size_t i = 0; i < row_offsets.size(); i += SIMD_WIDTH) {
            __m256i target_vec = _mm256_set1_epi32(target_val);
            alignas(32) int32_t values[SIMD_WIDTH];

            for (size_t j = 0; j < SIMD_WIDTH && (i + j) < row_offsets.size(); ++j) {
                load_row(i + j);
                Row* row = cache.get(i + j);
                if (row) std::memcpy(&values[j], row->data.data() + columns[col_idx].offset, sizeof(int32_t));
                else values[j] = 0;
            }

            __m256i data_vec = _mm256_load_si256(reinterpret_cast<const __m256i*>(values));
            __m256i cmp = _mm256_cmpeq_epi32(data_vec, target_vec);
            int mask = _mm256_movemask_ps(_mm256_castsi256_ps(cmp));

            for (size_t j = 0; j < SIMD_WIDTH && (i + j) < row_offsets.size(); ++j) {
                if (mask & (1 << j) && cache.get(i + j)) results.push_back(i + j);
            }
        }
        return results;
    }

    void update_all(size_t col_idx, const std::string& new_value) {
        std::shared_lock lock(transaction_mutex);
        if (row_offsets.size() < 1000) {
            for (size_t i = 0; i < row_offsets.size(); ++i) {
                if (cache.get(i)) {
                    std::string old_value = get_value(i, col_idx);
                    set_value(i, col_idx, new_value);
                    if (indexes.count(col_idx)) {
                        indexes[col_idx].remove(old_value, i);
                        indexes[col_idx].insert(new_value, i);
                    }
                }
            }
        }
        else {
            std::cout << "OpenCLで全行更新(擬似)\n";
            for (size_t i = 0; i < row_offsets.size(); ++i) {
                if (cache.get(i)) {
                    std::string old_value = get_value(i, col_idx);
                    set_value(i, col_idx, new_value);
                    if (indexes.count(col_idx)) {
                        indexes[col_idx].remove(old_value, i);
                        indexes[col_idx].insert(new_value, i);
                    }
                }
            }
        }
        log_operation("UPDATE_ALL", col_idx, { new_value });
    }

    std::string get_value(size_t row_idx, size_t col_idx) {
        std::shared_lock lock(transaction_mutex);
        load_row(row_idx);
        Row* row = cache.get(row_idx);
        if (!row) return "DELETED";
        char* ptr = row->data.data() + columns[col_idx].offset;
        switch (columns[col_idx].type) {
        case DataType::BOOL: return std::to_string(*(bool*)ptr);
        case DataType::FLOAT: return std::to_string(*(float*)ptr);
        case DataType::DOUBLE: return std::to_string(*(double*)ptr);
        case DataType::INT: return std::to_string(*(int32_t*)ptr);
        case DataType::SHORT: return std::to_string(*(int16_t*)ptr);
        case DataType::BYTE: return std::to_string(*(int8_t*)ptr);
        case DataType::LONG: return std::to_string(*(int64_t*)ptr);
        case DataType::UINT: return std::to_string(*(uint32_t*)ptr);
        case DataType::USHORT: return std::to_string(*(uint16_t*)ptr);
        case DataType::UBYTE: return std::to_string(*(uint8_t*)ptr);
        case DataType::ULONG: return std::to_string(*(uint64_t*)ptr);
        case DataType::STRING: case DataType::WSTRING: case DataType::BLOB:
            return strings[*(uint32_t*)ptr];
        case DataType::DATETIME: return std::to_string(*(int64_t*)ptr);
        case DataType::DATE: return std::to_string(*(int32_t*)ptr);
        case DataType::TIME: return std::to_string(*(int32_t*)ptr);
        default: return "";
        }
    }

    void set_value(size_t row_idx, size_t col_idx, const std::string& value) {
        std::shared_lock lock(transaction_mutex);
        load_row(row_idx);
        Row* row = cache.get(row_idx);
        if (!row) return;
        char* ptr = row->data.data() + columns[col_idx].offset;
        switch (columns[col_idx].type) {
        case DataType::BOOL: *ptr = std::stoi(value) != 0; break;
        case DataType::FLOAT: *(float*)ptr = std::stof(value); break;
        case DataType::DOUBLE: *(double*)ptr = std::stod(value); break;
        case DataType::INT: *(int32_t*)ptr = std::stoi(value); break;
        case DataType::SHORT: *(int16_t*)ptr = std::stoi(value); break;
        case DataType::BYTE: *(int8_t*)ptr = std::stoi(value); break;
        case DataType::LONG: *(int64_t*)ptr = std::stoll(value); break;
        case DataType::UINT: *(uint32_t*)ptr = std::stoul(value); break;
        case DataType::USHORT: *(uint16_t*)ptr = std::stoul(value); break;
        case DataType::UBYTE: *(uint8_t*)ptr = std::stoul(value); break;
        case DataType::ULONG: *(uint64_t*)ptr = std::stoull(value); break;
        case DataType::STRING: case DataType::WSTRING: case DataType::BLOB:
            *(uint32_t*)ptr = get_string_idx(value, columns[col_idx].type); break;
        case DataType::DATETIME: *(int64_t*)ptr = std::stoll(value); break;
        case DataType::DATE: *(int32_t*)ptr = std::stoi(value); break;
        case DataType::TIME: *(int32_t*)ptr = std::stoi(value); break;
        }
        row->dirty = true;
        log_operation("SET", row_idx, { std::to_string(col_idx), value });
    }

    void delete_row(size_t row_idx) {
        std::shared_lock lock(transaction_mutex);
        load_row(row_idx);
        Row* row = cache.get(row_idx);
        if (!row) return;
        free_list.push_back({ row_offsets[row_idx], row->compressed_size });
        for (size_t i = 0; i < columns.size(); ++i) {
            if (indexes.count(i)) indexes[i].remove(get_value(row_idx, i), row_idx);
        }
        cache.put(row_idx, nullptr);
        log_operation("DELETE", row_idx);
    }

    void flush() { write_file(); }

    void print() {
        std::shared_lock lock(transaction_mutex);
        for (size_t i = 0; i < row_offsets.size(); ++i) {
            for (size_t j = 0; j < columns.size(); ++j) {
                std::cout << columns[j].name << ": " << get_value(i, j) << " ";
            }
            std::cout << "\n";
        }
    }

    void create_index(size_t col_idx) {
        std::unique_lock lock(transaction_mutex);
        if (col_idx >= columns.size()) throw std::runtime_error("無効な列インデックス");
        if (!indexes.count(col_idx)) {
            indexes[col_idx] = BTreeIndex(shard_dir + "/" + filename + ".idx" + std::to_string(col_idx));
            for (size_t i = 0; i < row_offsets.size(); ++i) {
                if (cache.get(i)) {
                    indexes[col_idx].insert(get_value(i, col_idx), i);
                }
            }
        }
    }

    std::string stats() {
        return cache.stats() + ", Log Size: " + std::to_string(log_size) + " bytes";
    }

    std::vector<std::pair<std::vector<std::string>, size_t>> group_by(size_t group_col_idx, size_t agg_col_idx, bool sum_agg = false) {
        std::shared_lock lock(transaction_mutex);
        std::map<std::string, std::pair<int64_t, size_t>> groups; // value -> (sum/count, row_idx)
        for (size_t i = 0; i < row_offsets.size(); ++i) {
            if (!cache.get(i)) continue;
            std::string group_val = get_value(i, group_col_idx);
            int64_t agg_val = sum_agg ? std::stoll(get_value(i, agg_col_idx)) : 1;
            if (groups.count(group_val)) {
                if (sum_agg) groups[group_val].first += agg_val;
                else groups[group_val].first++;
            }
            else {
                groups[group_val] = { agg_val, i };
            }
        }
        std::vector<std::pair<std::vector<std::string>, size_t>> results;
        for (const auto& [val, data] : groups) {
            std::vector<std::string> row;
            for (size_t j = 0; j < columns.size(); ++j) {
                if (j == group_col_idx) row.push_back(val);
                else if (j == agg_col_idx) row.push_back(std::to_string(data.first));
                else row.push_back(get_value(data.second, j));
            }
            results.emplace_back(row, data.second);
        }
        return results;
    }

    void order_by(std::vector<std::pair<std::vector<std::string>, size_t>>& rows, size_t sort_col_idx, bool ascending = true) {
        std::shared_lock lock(transaction_mutex);
        std::sort(rows.begin(), rows.end(),
            [this, sort_col_idx, ascending](const auto& a, const auto& b) {
                std::string val_a = a.first[sort_col_idx];
                std::string val_b = b.first[sort_col_idx];
                return ascending ? val_a < val_b : val_a > val_b;
            });
    }

private:
    void recover() {
        std::ifstream log(shard_dir + "/" + filename + ".log");
        std::string line;
        while (std::getline(log, line)) {
            std::istringstream iss(line);
            std::string op;
            size_t row_idx;
            iss >> op >> row_idx;
            if (op == "INSERT") {
                load_row(row_idx);
            }
            else if (op == "DELETE") {
                cache.put(row_idx, nullptr);
                free_list.push_back({ row_offsets[row_idx], 0 });
            }
            else if (op == "SET") {
                size_t col_idx;
                std::string value;
                iss >> col_idx >> value;
                set_value(row_idx, col_idx, value);
            }
        }
        write_file();
    }

    void load_all_rows() {
        for (size_t i = 0; i < row_offsets.size(); ++i) {
            load_row(i);
        }
    }
};
#include <iostream>
#include <fstream>
#include <vector>
#include <string>
#include <cstdint>
#include <stdexcept>
#include <memory>
#include <immintrin.h>
#include <zlib.h>
#include <thread>
#include <mutex>
#include <list>
#include <map>
#include <set>
#include <shared_mutex>
#include <chrono>
#include <filesystem>
#include <algorithm>

enum class DataType
{
    BOOL, FLOAT, DOUBLE, INT, SHORT, BYTE, LONG, UINT, USHORT, UBYTE, ULONG,
    STRING, WSTRING, BLOB, DATETIME, DATE, TIME
};


struct Column
{
    std::string name;
    DataType type;
    size_t size;
    size_t offset;

    Column(const std::string& n, DataType t) : name(n), type(t), size(get_type_size(t)) {}
};

size_t get_type_size(DataType type) 
{
    switch (type)
    {
    case DataType::BOOL: return 1;
    case DataType::FLOAT: return 4;
    case DataType::DOUBLE: return 8;
    case DataType::INT: case DataType::UINT: return 4;
    case DataType::SHORT: case DataType::USHORT: return 2;
    case DataType::BYTE: case DataType::UBYTE: return 1;
    case DataType::LONG: case DataType::ULONG: return 8;
    case DataType::STRING: case DataType::WSTRING: case DataType::BLOB: return 4;
    case DataType::DATETIME: return 8;
    case DataType::DATE: return 4;
    case DataType::TIME: return 4;
    default: return 0;
    }
}


struct Row 
{
    std::vector<char> data;
    std::vector<char> compressed;
    bool dirty = false;
    bool is_compressed = false;
    size_t compressed_size = 0;
    bool use_rle = false;

    Row(size_t size) : data(size) {}
    void compress(bool prefer_rle = false) 
    {
        use_rle = prefer_rle && (data.size() > 100 && std::adjacent_find(data.begin(), data.end(), std::not_equal_to<>()) == data.end());
    
        if (use_rle)
        {
            compressed.clear();
            for (size_t i = 0; i < data.size();) 
            {
                uint8_t count = 1;
                while (i + count < data.size() && data[i] == data[i + count] && count < 255) count++;
                compressed.push_back(data[i]);
                compressed.push_back(count);
                i += count;
            }
        }
        else 
        {
            uLongf dest_len = compressBound(data.size());
            compressed.resize(dest_len);
            compress(reinterpret_cast<Bytef*>(compressed.data()), &dest_len,
                reinterpret_cast<const Bytef*>(data.data()), data.size());
            compressed.resize(dest_len);
        }
        compressed_size = compressed.size();
        is_compressed = true;
    }
    void decompress(size_t orig_size) 
    {
        data.resize(orig_size);
        if (use_rle) {
            size_t pos = 0;
            for (size_t i = 0; i < compressed.size() && pos < orig_size; i += 2) {
                char value = compressed[i];
                uint8_t count = compressed[i + 1];
                for (uint8_t j = 0; j < count && pos < orig_size; ++j) {
                    data[pos++] = value;
                }
            }
        }
        else {
            uLongf dest_len = orig_size;
            uncompress(reinterpret_cast<Bytef*>(data.data()), &dest_len,
                reinterpret_cast<const Bytef*>(compressed.data()), compressed.size());
        }
        is_compressed = false;
    }
};

class BTreeIndex {
    std::map<std::string, std::set<size_t>> index;
    std::shared_mutex mutex;
    std::string filename;

public:
    BTreeIndex(const std::string& fname) : filename(fname) {
        if (std::filesystem::exists(fname)) load();
    }
    void insert(const std::string& value, size_t row_idx) {
        std::unique_lock lock(mutex);
        index[value].insert(row_idx);
        save();
    }
    void remove(const std::string& value, size_t row_idx) {
        std::unique_lock lock(mutex);
        index[value].erase(row_idx);
        save();
    }
    std::vector<size_t> find(const std::string& value) const {
        std::shared_lock lock(mutex);
        auto it = index.find(value);
        if (it == index.end()) return {};
        return std::vector<size_t>(it->second.begin(), it->second.end());
    }
private:
    void save() {
        std::ofstream file(filename, std::ios::binary | std::ios::trunc);
        uint32_t size = index.size();
        file.write(reinterpret_cast<const char*>(&size), sizeof(size));
        for (const auto& [val, rows] : index) {
            uint32_t val_len = val.size();
            file.write(reinterpret_cast<const char*>(&val_len), sizeof(val_len));
            file.write(val.data(), val_len);
            uint32_t row_count = rows.size();
            file.write(reinterpret_cast<const char*>(&row_count), sizeof(row_count));
            for (auto row : rows) {
                file.write(reinterpret_cast<const char*>(&row), sizeof(row));
            }
        }
    }
    void load() {
        std::ifstream file(filename, std::ios::binary);
        if (!file) return;
        uint32_t size;
        file.read(reinterpret_cast<char*>(&size), sizeof(size));
        for (uint32_t i = 0; i < size; ++i) {
            uint32_t val_len;
            file.read(reinterpret_cast<char*>(&val_len), sizeof(val_len));
            std::string val(val_len, '\0');
            file.read(val.data(), val_len);
            uint32_t row_count;
            file.read(reinterpret_cast<char*>(&row_count), sizeof(row_count));
            std::set<size_t> rows;
            for (uint32_t j = 0; j < row_count; ++j) {
                size_t row;
                file.read(reinterpret_cast<char*>(&row), sizeof(row));
                rows.insert(row);
            }
            index[val] = rows;
        }
    }
};

class LRUCache 
{
    size_t max_size;
    std::unordered_map<size_t, std::unique_ptr<Row>> cache;
    std::list<size_t> lru_order;
    std::shared_mutex mutex;
    size_t hit_count = 0, miss_count = 0;

    void evict() 
    {
        std::unique_lock lock(mutex);
        if (cache.size() <= max_size) return;
        size_t oldest = lru_order.back();
        lru_order.pop_back();
        cache.erase(oldest);
    }

public:
    LRUCache(size_t size) : max_size(size) {}
    Row* get(size_t row_idx) 
    {
        std::shared_lock lock(mutex);
        auto it = cache.find(row_idx);
        if (it != cache.end()) 
        {
            hit_count++;
            lru_order.remove(row_idx);
            lru_order.push_front(row_idx);
            return it->second.get();
        }
        miss_count++;
        return nullptr;
    }
    void put(size_t row_idx, std::unique_ptr<Row> row) 
    {
        std::unique_lock lock(mutex);
        evict();
        lru_order.push_front(row_idx);
        cache[row_idx] = std::move(row);
    }
    void adjust_size(double hit_rate_threshold) 
    {
        std::shared_lock lock(mutex);
        double hit_rate = hit_count / static_cast<double>(hit_count + miss_count + 1);
        if (hit_rate < hit_rate_threshold) max_size += 10;
        else if (hit_rate > hit_rate_threshold + 0.1 && max_size > 50) max_size -= 5;
    }
    std::string stats()
    {
        std::shared_lock lock(mutex);
        return "Cache Hit: " + std::to_string(hit_count) + ", Miss: " + std::to_string(miss_count) +
            ", Size: " + std::to_string(max_size);
    }
};

class HDDUltimateTable 
{

public:

    std::vector<Column> columns;
    size_t row_size = 0;
    std::vector<uint64_t> row_offsets;
    std::list<std::pair<uint64_t, size_t>> free_list;
    std::vector<std::string> strings;
    uint32_t next_string_idx = 0;
    LRUCache cache{ 100 };
    std::map<size_t, BTreeIndex> indexes;

private:

    std::string filename;
    std::string shard_dir;
    std::mutex file_mutex;
    std::shared_mutex transaction_mutex;
    bool in_transaction = false;
    std::ofstream log_file;
    std::vector<std::pair<std::string, std::vector<char>>> log_entries;
    size_t log_size = 0;
    double hdd_rpm = 7200;

    uint32_t get_string_idx(const std::string& str, DataType type) 
    {
        std::unique_lock lock(file_mutex);
        for (uint32_t i = 0; i < strings.size(); ++i) 
        {
            if (strings[i] == str) return i;
        }
        strings.push_back(str);
        return next_string_idx++;
    }

    std::string get_shard_file(int shard_id)
    {
        return shard_dir + "/" + filename + "_shard" + std::to_string(shard_id) + ".bin";
    }

    void load_row(size_t row_idx) 
    {
        if (cache.get(row_idx)) return;

        int shard_id = row_idx % 2;
        std::ifstream file(get_shard_file(shard_id), std::ios::binary);
        if (!file) throw std::runtime_error("ファイルオープン失敗: " + get_shard_file(shard_id));
        file.seekg(row_offsets[row_idx]);
        auto row = std::make_unique<Row>(row_size);
        uint32_t comp_size;
        bool is_rle;
        file.read(reinterpret_cast<char*>(&comp_size), sizeof(comp_size));
        file.read(reinterpret_cast<char*>(&is_rle), sizeof(is_rle));
        row->compressed.resize(comp_size);
        file.read(row->compressed.data(), comp_size);
        row->decompress(row_size);
        row->use_rle = is_rle;
        cache.put(row_idx, std::move(row));
    }

    void prefetch_rows(size_t start_idx, size_t count) 
    {
        std::thread prefetch_thread([this, start_idx, count]() 
            {
            for (size_t i = start_idx; i < start_idx + count && i < row_offsets.size(); ++i)
            {
                if (!cache.get(i)) load_row(i);
            }
            });
        prefetch_thread.detach();
    }

    void write_file(bool checkpoint = false) {
        std::unique_lock lock(file_mutex);
        for (int shard_id = 0; shard_id < 2; ++shard_id) {
            std::ofstream file(get_shard_file(shard_id), std::ios::binary | std::ios::trunc);
            uint32_t num_rows = row_offsets.size() / 2 + (row_offsets.size() % 2 > shard_id);
            file.write(reinterpret_cast<const char*>(&num_rows), sizeof(num_rows));
            for (size_t i = shard_id; i < row_offsets.size(); i += 2) {
                file.write(reinterpret_cast<const char*>(&row_offsets[i]), sizeof(row_offsets[i]));
            }

            for (size_t i = shard_id; i < row_offsets.size(); i += 2) {
                Row* row = cache.get(i);
                if (row) {
                    if (row->dirty) {
                        bool use_rle = (hdd_rpm < 5400);
                        row->compress(use_rle);
                    }
                    file.seekp(row_offsets[i]);
                    file.write(reinterpret_cast<const char*>(&row->compressed_size), sizeof(row->compressed_size));
                    file.write(reinterpret_cast<const char*>(&row->use_rle), sizeof(row->use_rle));
                    file.write(row->compressed.data(), row->compressed_size);
                }
            }
        }
        if (checkpoint && in_transaction) {
            log_file.close();
            log_file.open(shard_dir + "/" + filename + ".log", std::ios::trunc);
            log_entries.clear();
            log_size = 0;
        }
    }

    void log_operation(const std::string& op, size_t row_idx = -1, const std::vector<std::string>& values = {}) {
        if (!in_transaction) return;
        std::stringstream log;
        log << op << " " << row_idx;
        std::vector<char> old_data;
        if (op == "SET" && row_idx != -1) {
            Row* row = cache.get(row_idx);
            if (row) old_data = row->data;
        }
        for (const auto& v : values) log << " " << v;
        std::unique_lock lock(file_mutex);
        log_entries.push_back({ log.str(), old_data });
        log_file << log.str() << "\n";
        log_size += log.str().size() + old_data.size();
        if (log_size > 1024 * 1024) write_file(true);
    }

public:
    HDDUltimateTable(const std::string& fname, const std::string& dir = ".", double rpm = 7200)
        : filename(fname), shard_dir(dir), hdd_rpm(rpm) {
        std::filesystem::create_directory(shard_dir);
        for (int shard_id = 0; shard_id < 2; ++shard_id) {
            std::ofstream file(get_shard_file(shard_id), std::ios::binary | std::ios::trunc);
            uint32_t num_rows = 0;
            file.write(reinterpret_cast<const char*>(&num_rows), sizeof(num_rows));
        }
        log_file.open(shard_dir + "/" + filename + ".log", std::ios::app);
        if (std::filesystem::exists(shard_dir + "/" + filename + ".log")) recover();
    }

    ~HDDUltimateTable() {
        if (log_file.is_open()) log_file.close();
    }

    void CreateTable() {
        std::unique_lock lock(transaction_mutex);
        if (in_transaction) throw std::runtime_error("トランザクション中はCREATE TABLE不可");
        columns.clear();
        row_size = 0;
        row_offsets.clear();
        free_list.clear();
        strings.clear();
        next_string_idx = 0;
        write_file();
    }

    void AddColumn(const std::string& name, DataType type) {
        std::unique_lock lock(transaction_mutex);
        Column col(name, type);
        col.offset = row_size;
        row_size += col.size;
        columns.push_back(col);
    }

    void begin_transaction() {
        std::unique_lock lock(transaction_mutex);
        if (in_transaction) throw std::runtime_error("既にトランザクション中");
        in_transaction = true;
        log_entries.clear();
    }

    void commit() {
        std::unique_lock lock(transaction_mutex);
        if (!in_transaction) throw std::runtime_error("トランザクションが開始されていません");
        in_transaction = false;
        write_file();
        log_entries.clear();
    }

    void rollback() {
        std::unique_lock lock(transaction_mutex);
        if (!in_transaction) throw std::runtime_error("トランザクションが開始されていません");
        in_transaction = false;
        for (auto it = log_entries.rbegin(); it != log_entries.rend(); ++it) {
            std::istringstream iss(it->first);
            std::string op;
            size_t row_idx;
            iss >> op >> row_idx;
            if (op == "INSERT") {
                cache.put(row_idx, nullptr);
                free_list.push_back({ row_offsets[row_idx], 0 });
            }
            else if (op == "DELETE") {
                load_row(row_idx);
            }
            else if (op == "SET") {
                size_t col_idx;
                std::string value;
                iss >> col_idx >> value;
                Row* row = cache.get(row_idx);
                if (row && !it->second.empty()) row->data = it->second;
            }
        }
        log_entries.clear();
        write_file();
    }

    void insert(const std::vector<std::string>& values) {
        std::shared_lock lock(transaction_mutex);
        auto row = std::make_unique<Row>(row_size);
        for (size_t i = 0; i < columns.size(); ++i) {
            char* ptr = row->data.data() + columns[i].offset;
            switch (columns[i].type) {
            case DataType::BOOL: *ptr = std::stoi(values[i]) != 0; break;
            case DataType::FLOAT: *(float*)ptr = std::stof(values[i]); break;
            case DataType::DOUBLE: *(double*)ptr = std::stod(values[i]); break;
            case DataType::INT: *(int32_t*)ptr = std::stoi(values[i]); break;
            case DataType::SHORT: *(int16_t*)ptr = std::stoi(values[i]); break;
            case DataType::BYTE: *(int8_t*)ptr = std::stoi(values[i]); break;
            case DataType::LONG: *(int64_t*)ptr = std::stoll(values[i]); break;
            case DataType::UINT: *(uint32_t*)ptr = std::stoul(values[i]); break;
            case DataType::USHORT: *(uint16_t*)ptr = std::stoul(values[i]); break;
            case DataType::UBYTE: *(uint8_t*)ptr = std::stoul(values[i]); break;
            case DataType::ULONG: *(uint64_t*)ptr = std::stoull(values[i]); break;
            case DataType::STRING: case DataType::WSTRING: case DataType::BLOB:
                *(uint32_t*)ptr = get_string_idx(values[i], columns[i].type); break;
            case DataType::DATETIME: *(int64_t*)ptr = std::stoll(values[i]); break;
            case DataType::DATE: *(int32_t*)ptr = std::stoi(values[i]); break;
            case DataType::TIME: *(int32_t*)ptr = std::stoi(values[i]); break;
            }
            if (indexes.count(i)) indexes[i].insert(values[i], row_offsets.size());
        }
        row->dirty = true;

        uint64_t offset;
        size_t row_idx;
        if (!free_list.empty()) {
            auto it = free_list.begin();
            while (it != free_list.end() && it->second < row->compressed_size) ++it;
            if (it != free_list.end()) {
                offset = it->first;
                row_idx = std::distance(row_offsets.begin(), std::find(row_offsets.begin(), row_offsets.end(), offset));
                free_list.erase(it);
            }
            else {
                offset = sizeof(uint32_t) + row_offsets.size() * sizeof(uint64_t) + row_offsets.size() * (row_size + sizeof(uint32_t) + sizeof(bool));
                row_idx = row_offsets.size();
                row_offsets.push_back(offset);
            }
        }
        else {
            offset = sizeof(uint32_t) + row_offsets.size() * sizeof(uint64_t) + row_offsets.size() * (row_size + sizeof(uint32_t) + sizeof(bool));
            row_idx = row_offsets.size();
            row_offsets.push_back(offset);
        }
        cache.put(row_idx, std::move(row));
        log_operation("INSERT", row_idx, values);
    }

    std::vector<size_t> search_simd(size_t col_idx, const std::string& target, bool use_index = true) {
        std::shared_lock lock(transaction_mutex);
        if (use_index && indexes.count(col_idx)) {
            return indexes[col_idx].find(target);
        }

        std::vector<size_t> results;
        if (columns[col_idx].type != DataType::INT) return results;

        int32_t target_val = std::stoi(target);
        constexpr size_t SIMD_WIDTH = 8;
        prefetch_rows(0, std::min<size_t>(SIMD_WIDTH * 2, row_offsets.size()));

        for (size_t i = 0; i < row_offsets.size(); i += SIMD_WIDTH) {
            __m256i target_vec = _mm256_set1_epi32(target_val);
            alignas(32) int32_t values[SIMD_WIDTH];

            for (size_t j = 0; j < SIMD_WIDTH && (i + j) < row_offsets.size(); ++j) {
                load_row(i + j);
                Row* row = cache.get(i + j);
                if (row) std::memcpy(&values[j], row->data.data() + columns[col_idx].offset, sizeof(int32_t));
                else values[j] = 0;
            }

            __m256i data_vec = _mm256_load_si256(reinterpret_cast<const __m256i*>(values));
            __m256i cmp = _mm256_cmpeq_epi32(data_vec, target_vec);
            int mask = _mm256_movemask_ps(_mm256_castsi256_ps(cmp));

            for (size_t j = 0; j < SIMD_WIDTH && (i + j) < row_offsets.size(); ++j) {
                if (mask & (1 << j) && cache.get(i + j)) results.push_back(i + j);
            }
        }
        return results;
    }

    void update_all(size_t col_idx, const std::string& new_value) {
        std::shared_lock lock(transaction_mutex);
        if (row_offsets.size() < 1000) {
            for (size_t i = 0; i < row_offsets.size(); ++i) {
                if (cache.get(i)) {
                    std::string old_value = get_value(i, col_idx);
                    set_value(i, col_idx, new_value);
                    if (indexes.count(col_idx)) {
                        indexes[col_idx].remove(old_value, i);
                        indexes[col_idx].insert(new_value, i);
                    }
                }
            }
        }
        else {
            std::cout << "OpenCLで全行更新(擬似)\n";
            for (size_t i = 0; i < row_offsets.size(); ++i) {
                if (cache.get(i)) {
                    std::string old_value = get_value(i, col_idx);
                    set_value(i, col_idx, new_value);
                    if (indexes.count(col_idx)) {
                        indexes[col_idx].remove(old_value, i);
                        indexes[col_idx].insert(new_value, i);
                    }
                }
            }
        }
        log_operation("UPDATE_ALL", col_idx, { new_value });
    }

    std::string get_value(size_t row_idx, size_t col_idx) {
        std::shared_lock lock(transaction_mutex);
        load_row(row_idx);
        Row* row = cache.get(row_idx);
        if (!row) return "DELETED";
        char* ptr = row->data.data() + columns[col_idx].offset;
        switch (columns[col_idx].type) {
        case DataType::BOOL: return std::to_string(*(bool*)ptr);
        case DataType::FLOAT: return std::to_string(*(float*)ptr);
        case DataType::DOUBLE: return std::to_string(*(double*)ptr);
        case DataType::INT: return std::to_string(*(int32_t*)ptr);
        case DataType::SHORT: return std::to_string(*(int16_t*)ptr);
        case DataType::BYTE: return std::to_string(*(int8_t*)ptr);
        case DataType::LONG: return std::to_string(*(int64_t*)ptr);
        case DataType::UINT: return std::to_string(*(uint32_t*)ptr);
        case DataType::USHORT: return std::to_string(*(uint16_t*)ptr);
        case DataType::UBYTE: return std::to_string(*(uint8_t*)ptr);
        case DataType::ULONG: return std::to_string(*(uint64_t*)ptr);
        case DataType::STRING: case DataType::WSTRING: case DataType::BLOB:
            return strings[*(uint32_t*)ptr];
        case DataType::DATETIME: return std::to_string(*(int64_t*)ptr);
        case DataType::DATE: return std::to_string(*(int32_t*)ptr);
        case DataType::TIME: return std::to_string(*(int32_t*)ptr);
        default: return "";
        }
    }

    void set_value(size_t row_idx, size_t col_idx, const std::string& value) {
        std::shared_lock lock(transaction_mutex);
        load_row(row_idx);
        Row* row = cache.get(row_idx);
        if (!row) return;
        char* ptr = row->data.data() + columns[col_idx].offset;
        switch (columns[col_idx].type) {
        case DataType::BOOL: *ptr = std::stoi(value) != 0; break;
        case DataType::FLOAT: *(float*)ptr = std::stof(value); break;
        case DataType::DOUBLE: *(double*)ptr = std::stod(value); break;
        case DataType::INT: *(int32_t*)ptr = std::stoi(value); break;
        case DataType::SHORT: *(int16_t*)ptr = std::stoi(value); break;
        case DataType::BYTE: *(int8_t*)ptr = std::stoi(value); break;
        case DataType::LONG: *(int64_t*)ptr = std::stoll(value); break;
        case DataType::UINT: *(uint32_t*)ptr = std::stoul(value); break;
        case DataType::USHORT: *(uint16_t*)ptr = std::stoul(value); break;
        case DataType::UBYTE: *(uint8_t*)ptr = std::stoul(value); break;
        case DataType::ULONG: *(uint64_t*)ptr = std::stoull(value); break;
        case DataType::STRING: case DataType::WSTRING: case DataType::BLOB:
            *(uint32_t*)ptr = get_string_idx(value, columns[col_idx].type); break;
        case DataType::DATETIME: *(int64_t*)ptr = std::stoll(value); break;
        case DataType::DATE: *(int32_t*)ptr = std::stoi(value); break;
        case DataType::TIME: *(int32_t*)ptr = std::stoi(value); break;
        }
        row->dirty = true;
        log_operation("SET", row_idx, { std::to_string(col_idx), value });
    }

    void delete_row(size_t row_idx) {
        std::shared_lock lock(transaction_mutex);
        load_row(row_idx);
        Row* row = cache.get(row_idx);
        if (!row) return;
        free_list.push_back({ row_offsets[row_idx], row->compressed_size });
        for (size_t i = 0; i < columns.size(); ++i) {
            if (indexes.count(i)) indexes[i].remove(get_value(row_idx, i), row_idx);
        }
        cache.put(row_idx, nullptr);
        log_operation("DELETE", row_idx);
    }

    void flush() { write_file(); }

    void print() {
        std::shared_lock lock(transaction_mutex);
        for (size_t i = 0; i < row_offsets.size(); ++i) {
            for (size_t j = 0; j < columns.size(); ++j) {
                std::cout << columns[j].name << ": " << get_value(i, j) << " ";
            }
            std::cout << "\n";
        }
    }

    void create_index(size_t col_idx) {
        std::unique_lock lock(transaction_mutex);
        if (col_idx >= columns.size()) throw std::runtime_error("無効な列インデックス");
        if (!indexes.count(col_idx)) {
            indexes[col_idx] = BTreeIndex(shard_dir + "/" + filename + ".idx" + std::to_string(col_idx));
            for (size_t i = 0; i < row_offsets.size(); ++i) {
                if (cache.get(i)) {
                    indexes[col_idx].insert(get_value(i, col_idx), i);
                }
            }
        }
    }

    std::string stats() {
        return cache.stats() + ", Log Size: " + std::to_string(log_size) + " bytes";
    }

    std::vector<std::pair<std::vector<std::string>, size_t>> group_by(size_t group_col_idx, size_t agg_col_idx, bool sum_agg = false) {
        std::shared_lock lock(transaction_mutex);
        std::map<std::string, std::pair<int64_t, size_t>> groups; // value -> (sum/count, row_idx)
        for (size_t i = 0; i < row_offsets.size(); ++i) {
            if (!cache.get(i)) continue;
            std::string group_val = get_value(i, group_col_idx);
            int64_t agg_val = sum_agg ? std::stoll(get_value(i, agg_col_idx)) : 1;
            if (groups.count(group_val)) {
                if (sum_agg) groups[group_val].first += agg_val;
                else groups[group_val].first++;
            }
            else {
                groups[group_val] = { agg_val, i };
            }
        }
        std::vector<std::pair<std::vector<std::string>, size_t>> results;
        for (const auto& [val, data] : groups) {
            std::vector<std::string> row;
            for (size_t j = 0; j < columns.size(); ++j) {
                if (j == group_col_idx) row.push_back(val);
                else if (j == agg_col_idx) row.push_back(std::to_string(data.first));
                else row.push_back(get_value(data.second, j));
            }
            results.emplace_back(row, data.second);
        }
        return results;
    }

    void order_by(std::vector<std::pair<std::vector<std::string>, size_t>>& rows, size_t sort_col_idx, bool ascending = true) {
        std::shared_lock lock(transaction_mutex);
        std::sort(rows.begin(), rows.end(),
            [this, sort_col_idx, ascending](const auto& a, const auto& b) {
                std::string val_a = a.first[sort_col_idx];
                std::string val_b = b.first[sort_col_idx];
                return ascending ? val_a < val_b : val_a > val_b;
            });
    }

private:
    void recover() {
        std::ifstream log(shard_dir + "/" + filename + ".log");
        std::string line;
        while (std::getline(log, line)) {
            std::istringstream iss(line);
            std::string op;
            size_t row_idx;
            iss >> op >> row_idx;
            if (op == "INSERT") {
                load_row(row_idx);
            }
            else if (op == "DELETE") {
                cache.put(row_idx, nullptr);
                free_list.push_back({ row_offsets[row_idx], 0 });
            }
            else if (op == "SET") {
                size_t col_idx;
                std::string value;
                iss >> col_idx >> value;
                set_value(row_idx, col_idx, value);
            }
        }
        write_file();
    }

    void load_all_rows() {
        for (size_t i = 0; i < row_offsets.size(); ++i) {
            load_row(i);
        }
    }
};
#include <vector>
#include <map>
#include <string>
#include <iostream>
#include <vector>
#include <string>
#include <unordered_map>
#include <memory>

#include "HddTable.h"

class SQLProcessor
{

private:
    
    std::map<std::string, HddTable> tables;
    std::mutex db_mutex;

    std::vector<std::string> tokenize(const std::string& sql, size_t& error_pos) 
    {
        std::vector<std::string> tokens;
        std::stringstream ss(sql);
        std::string token;
        size_t pos = 0;
        while (ss >> token) {
            tokens.push_back(token);
            pos += token.length() + 1;
        }
        error_pos = pos;
        return tokens;
    }

    void create_table(const std::vector<std::string>& tokens, size_t error_pos) {
        std::unique_lock lock(db_mutex);
        if (tokens.size() < 4 || tokens[0] != "CREATE" || tokens[1] != "TABLE") {
            throw std::runtime_error("無効なCREATE TABLE文 at " + std::to_string(error_pos));
        }
        std::string table_name = tokens[2];
        if (tables.count(table_name)) {
            throw std::runtime_error("テーブルが既に存在します: " + table_name);
        }

        tables[table_name] = HddTable(table_name + ".bin");
        tables[table_name].CreateTable();

        size_t i = 3;
        if (tokens[i] != "(") throw std::runtime_error("括弧がありません at " + std::to_string(error_pos + i));
        i++;
        while (i < tokens.size() && tokens[i] != ")") {
            std::string col_name = tokens[i++];
            std::string type_str = tokens[i++];
            DataType type;
            if (type_str == "INT") type = DataType::INT;
            else if (type_str == "FLOAT") type = DataType::FLOAT;
            else if (type_str == "STRING") type = DataType::STRING;
            else if (type_str == "BOOL") type = DataType::BOOL;
            else throw std::runtime_error("未知のデータ型: " + type_str + " at " + std::to_string(error_pos + i));
            tables[table_name].AddColumn(col_name, type);
            if (i < tokens.size() && tokens[i] == ",") i++;
        }
    }

    void delete_table(const std::vector<std::string>& tokens, size_t error_pos) {
        std::unique_lock lock(db_mutex);
        if (tokens.size() != 3 || tokens[0] != "DELETE" || tokens[1] != "TABLE") {
            throw std::runtime_error("無効なDELETE TABLE文 at " + std::to_string(error_pos));
        }
        std::string table_name = tokens[2];
        if (!tables.count(table_name)) {
            throw std::runtime_error("テーブルが存在しません: " + table_name);
        }
        tables.erase(table_name);
        std::remove((table_name + ".bin").c_str());
        std::remove((table_name + ".bin.log").c_str());
    }

    void select(const std::vector<std::string>& tokens, size_t error_pos) {
        std::shared_lock lock(db_mutex);
        if (tokens.size() < 4 || tokens[0] != "SELECT" || tokens[2] != "FROM") {
            throw std::runtime_error("無効なSELECT文 at " + std::to_string(error_pos));
        }
        std::string table_name = tokens[3];
        if (!tables.count(table_name)) {
            throw std::runtime_error("テーブルが存在しません: " + table_name);
        }

        HddTable& table = tables[table_name];
        size_t join_idx = std::find(tokens.begin(), tokens.end(), "JOIN") - tokens.begin();
        if (join_idx < tokens.size()) {
            bool is_left = (tokens[join_idx - 1] == "LEFT");
            std::string join_table = tokens[join_idx + 1];
            if (!tables.count(join_table)) {
                throw std::runtime_error("結合テーブルが存在しません: " + join_table);
            }
            size_t on_idx = std::find(tokens.begin(), tokens.end(), "ON") - tokens.begin();
            if (on_idx >= tokens.size() || tokens.size() < on_idx + 4) {
                throw std::runtime_error("ON条件が不正 at " + std::to_string(error_pos + on_idx));
            }
            std::string on_col1 = tokens[on_idx + 1];
            std::string on_col2 = tokens[on_idx + 3];
            size_t col1_idx = -1, col2_idx = -1;
            for (size_t i = 0; i < table.columns.size(); ++i) {
                if (table.columns[i].name == on_col1) col1_idx = i;
            }
            HddTable& join_table_ref = tables[join_table];
            for (size_t i = 0; i < join_table_ref.columns.size(); ++i) {
                if (join_table_ref.columns[i].name == on_col2) col2_idx = i;
            }
            if (col1_idx == -1 || col2_idx == -1) {
                throw std::runtime_error("結合列が見つかりません");
            }
            for (size_t i = 0; i < table.row_offsets.size(); ++i) {
                if (!table.cache.get(i) && !is_left) continue;
                std::string val1 = table.get_value(i, col1_idx);
                auto matches = join_table_ref.search_simd(col2_idx, val1);
                if (matches.empty() && is_left) {
                    for (size_t k = 0; k < table.columns.size(); ++k) {
                        std::cout << table.columns[k].name << ": " << table.get_value(i, k) << " ";
                    }
                    for (size_t k = 0; k < join_table_ref.columns.size(); ++k) {
                        std::cout << join_table_ref.columns[k].name << ": NULL ";
                    }
                    std::cout << "\n";
                }
                for (auto j : matches) {
                    for (size_t k = 0; k < table.columns.size(); ++k) {
                        std::cout << table.columns[k].name << ": " << table.get_value(i, k) << " ";
                    }
                    for (size_t k = 0; k < join_table_ref.columns.size(); ++k) {
                        std::cout << join_table_ref.columns[k].name << ": " << join_table_ref.get_value(j, k) << " ";
                    }
                    std::cout << "\n";
                }
            }
        }
        else {
            size_t where_idx = std::find(tokens.begin(), tokens.end(), "WHERE") - tokens.begin();
            if (where_idx < tokens.size()) {
                if (tokens.size() < where_idx + 4 || tokens[where_idx + 2] != "=") {
                    throw std::runtime_error("無効なWHERE条件 at " + std::to_string(error_pos + where_idx));
                }
                std::string col_name = tokens[where_idx + 1];
                std::string value = tokens[where_idx + 3];
                size_t col_idx = -1;
                for (size_t i = 0; i < table.columns.size(); ++i) {
                    if (table.columns[i].name == col_name) {
                        col_idx = i;
                        break;
                    }
                }
                if (col_idx == -1) throw std::runtime_error("列が見つかりません: " + col_name);
                auto results = table.search_simd(col_idx, value);
                for (auto idx : results) {
                    for (size_t j = 0; j < table.columns.size(); ++j) {
                        std::cout << table.columns[j].name << ": " << table.get_value(idx, j) << " ";
                    }
                    std::cout << "\n";
                }
            }
            else {
                table.print();
            }
        }
    }

    void insert(const std::vector<std::string>& tokens, size_t error_pos) {
        std::shared_lock lock(db_mutex);
        if (tokens.size() < 5 || tokens[0] != "INSERT" || tokens[1] != "INTO" || tokens[3] != "VALUES") {
            throw std::runtime_error("無効なINSERT文 at " + std::to_string(error_pos));
        }
        std::string table_name = tokens[2];
        if (!tables.count(table_name)) {
            throw std::runtime_error("テーブルが存在しません: " + table_name);
        }

        HddTable& table = tables[table_name];
        std::vector<std::string> values;
        size_t i = 4;
        if (tokens[i] != "(") throw std::runtime_error("括弧がありません at " + std::to_string(error_pos + i));
        i++;
        while (i < tokens.size() && tokens[i] != ")") {
            values.push_back(tokens[i]);
            i++;
            if (i < tokens.size() && tokens[i] == ",") i++;
        }
        table.insert(values);
    }

    void update(const std::vector<std::string>& tokens, size_t error_pos) {
        std::shared_lock lock(db_mutex);
        if (tokens.size() < 6 || tokens[0] != "UPDATE" || tokens[2] != "SET") {
            throw std::runtime_error("無効なUPDATE文 at " + std::to_string(error_pos));
        }
        std::string table_name = tokens[1];
        if (!tables.count(table_name)) {
            throw std::runtime_error("テーブルが存在しません: " + table_name);
        }

        HddTable& table = tables[table_name];
        std::string col_name = tokens[3];
        if (tokens[4] != "=") throw std::runtime_error("SET構文エラー at " + std::to_string(error_pos + 4));
        std::string value = tokens[5];
        size_t col_idx = -1;
        for (size_t i = 0; i < table.columns.size(); ++i) {
            if (table.columns[i].name == col_name) {
                col_idx = i;
                break;
            }
        }
        if (col_idx == -1) throw std::runtime_error("列が見つかりません: " + col_name);

        size_t where_idx = std::find(tokens.begin(), tokens.end(), "WHERE") - tokens.begin();
        if (where_idx < tokens.size()) {
            std::string where_col = tokens[where_idx + 1];
            std::string where_value = tokens[where_idx + 3];
            size_t where_col_idx = -1;
            for (size_t i = 0; i < table.columns.size(); ++i) {
                if (table.columns[i].name == where_col) {
                    where_col_idx = i;
                    break;
                }
            }
            if (where_col_idx == -1) throw std::runtime_error("WHERE列が見つかりません: " + where_col);
            auto rows = table.search_simd(where_col_idx, where_value);
            for (auto row_idx : rows) {
                table.set_value(row_idx, col_idx, value);
            }
        }
        else {
            table.update_all(col_idx, value);
        }
        table.flush();
    }

    void delete_rows(const std::vector<std::string>& tokens, size_t error_pos) {
        std::shared_lock lock(db_mutex);
        if (tokens.size() < 3 || tokens[0] != "DELETE" || tokens[1] != "FROM") {
            throw std::runtime_error("無効なDELETE文 at " + std::to_string(error_pos));
        }
        std::string table_name = tokens[2];
        if (!tables.count(table_name)) {
            throw std::runtime_error("テーブルが存在しません: " + table_name);
        }

        HddTable& table = tables[table_name];
        size_t where_idx = std::find(tokens.begin(), tokens.end(), "WHERE") - tokens.begin();
        if (where_idx < tokens.size()) {
            std::string col_name = tokens[where_idx + 1];
            std::string value = tokens[where_idx + 3];
            size_t col_idx = -1;
            for (size_t i = 0; i < table.columns.size(); ++i) {
                if (table.columns[i].name == col_name) {
                    col_idx = i;
                    break;
                }
            }
            if (col_idx == -1) throw std::runtime_error("列が見つかりません: " + col_name);
            auto rows = table.search_simd(col_idx, value);
            for (auto row_idx : rows) {
                table.delete_row(row_idx);
            }
        }
        else {
            for (size_t i = 0; i < table.row_offsets.size(); ++i) {
                table.delete_row(i);
            }
        }
        table.flush();
    }

    void create_index(const std::vector<std::string>& tokens, size_t error_pos) {
        std::shared_lock lock(db_mutex);
        if (tokens.size() < 5 || tokens[0] != "CREATE" || tokens[1] != "INDEX" || tokens[3] != "ON") {
            throw std::runtime_error("無効なCREATE INDEX文 at " + std::to_string(error_pos));
        }
        std::string table_name = tokens[4];
        if (!tables.count(table_name)) {
            throw std::runtime_error("テーブルが存在しません: " + table_name);
        }
        std::string col_name = tokens[5].substr(1, tokens[5].size() - 2);
        HddTable& table = tables[table_name];
        size_t col_idx = -1;
        for (size_t i = 0; i < table.columns.size(); ++i) {
            if (table.columns[i].name == col_name) {
                col_idx = i;
                break;
            }
        }
        if (col_idx == -1) throw std::runtime_error("列が見つかりません: " + col_name);
        table.create_index(col_idx);
    }

public:
    void execute(const std::string& sql) {
        size_t error_pos;
        std::vector<std::string> tokens = tokenize(sql, error_pos);
        if (tokens.empty()) return;

        try {
            if (tokens[0] == "CREATE" && tokens[1] == "TABLE") {
                create_table(tokens, error_pos);
            }
            else if (tokens[0] == "DELETE" && tokens[1] == "TABLE") {
                delete_table(tokens, error_pos);
            }
            else if (tokens[0] == "SELECT") {
                select(tokens, error_pos);
            }
            else if (tokens[0] == "INSERT") {
                insert(tokens, error_pos);
            }
            else if (tokens[0] == "UPDATE") {
                update(tokens, error_pos);
            }
            else if (tokens[0] == "DELETE") {
                delete_rows(tokens, error_pos);
            }
            else if (tokens[0] == "CREATE" && tokens[1] == "INDEX") {
                create_index(tokens, error_pos);
            }
            else if (tokens[0] == "BEGIN") {
                tables[tokens[1]].begin_transaction();
            }
            else if (tokens[0] == "COMMIT") {
                tables[tokens[1]].commit();
            }
            else if (tokens[0] == "ROLLBACK") {
                tables[tokens[1]].rollback();
            }
            else if (tokens[0] == "SHOW" && tokens[1] == "STATS") {
                std::cout << tables[tokens[2]].stats() << "\n";
            }
            else {
                throw std::runtime_error("未知のSQLコマンド: " + tokens[0] + " at " + std::to_string(error_pos));
            }
        }
        catch (const std::exception& e) {
            std::cerr << "エラー: " << e.what() << "\n";
        }
    }

    void run_cli() {
        std::string input;
        std::cout << "SQL> ";
        while (std::getline(std::cin, input)) {
            if (input == "EXIT") break;
            execute(input);
            std::cout << "SQL> ";
        }
    }
};

いいなと思ったら応援しよう!

コメント

ログイン または 会員登録 するとコメントできます。
DBの思案|古井和雄
word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word word

mmMwWLliI0fiflO&1
mmMwWLliI0fiflO&1
mmMwWLliI0fiflO&1
mmMwWLliI0fiflO&1
mmMwWLliI0fiflO&1
mmMwWLliI0fiflO&1
mmMwWLliI0fiflO&1