#include <iostream>
#include <fstream>
#include <vector>
#include <string>
#include <cstdint>
#include <stdexcept>
#include <memory>
#include <immintrin.h>
#include <zlib.h>
#include <thread>
#include <mutex>
#include <list>
#include <map>
#include <set>
#include <shared_mutex>
#include <chrono>
#include <filesystem>
#include <algorithm>
enum class DataType
{
BOOL, FLOAT, DOUBLE, INT, SHORT, BYTE, LONG, UINT, USHORT, UBYTE, ULONG,
STRING, WSTRING, BLOB, DATETIME, DATE, TIME
};
struct Column
{
std ::string name;
DataType type;
size_t size;
size_t offset;
Column(const std ::string & n, DataType t) : name(n), type(t), size(get_type_size(t)) {}
};
size_t get_type_size(DataType type)
{
switch (type)
{
case DataType::BOOL: return 1 ;
case DataType::FLOAT: return 4 ;
case DataType::DOUBLE: return 8 ;
case DataType::INT: case DataType::UINT: return 4 ;
case DataType::SHORT: case DataType::USHORT: return 2 ;
case DataType::BYTE: case DataType::UBYTE: return 1 ;
case DataType::LONG: case DataType::ULONG: return 8 ;
case DataType::STRING: case DataType::WSTRING: case DataType::BLOB: return 4 ;
case DataType::DATETIME: return 8 ;
case DataType::DATE: return 4 ;
case DataType::TIME: return 4 ;
default : return 0 ;
}
}
struct Row
{
std ::vector <char > data;
std ::vector <char > compressed;
bool dirty = false ;
bool is_compressed = false ;
size_t compressed_size = 0 ;
bool use_rle = false ;
Row(size_t size) : data(size) {}
void compress (bool prefer_rle = false )
{
use_rle = prefer_rle && (data.size() > 100 && std ::adjacent_find(data.begin(), data.end(), std ::not_equal_to<>()) == data.end());
if (use_rle)
{
compressed.clear();
for (size_t i = 0 ; i < data.size();)
{
uint8_t count = 1 ;
while (i + count < data.size() && data[i] == data[i + count] && count < 255 ) count++;
compressed.push_back(data[i]);
compressed.push_back(count);
i += count;
}
}
else
{
uLongf dest_len = compressBound(data.size());
compressed.resize(dest_len);
compress(reinterpret_cast <Bytef*>(compressed.data()), &dest_len,
reinterpret_cast <const Bytef*>(data.data()), data.size());
compressed.resize(dest_len);
}
compressed_size = compressed.size();
is_compressed = true ;
}
void decompress (size_t orig_size)
{
data.resize(orig_size);
if (use_rle) {
size_t pos = 0 ;
for (size_t i = 0 ; i < compressed.size() && pos < orig_size; i += 2 ) {
char value = compressed[i];
uint8_t count = compressed[i + 1 ];
for (uint8_t j = 0 ; j < count && pos < orig_size; ++j) {
data[pos++] = value;
}
}
}
else {
uLongf dest_len = orig_size;
uncompress(reinterpret_cast <Bytef*>(data.data()), &dest_len,
reinterpret_cast <const Bytef*>(compressed.data()), compressed.size());
}
is_compressed = false ;
}
};
class BTreeIndex {
std ::map <std ::string , std ::set <size_t >> index;
std ::shared_mutex mutex;
std ::string filename;
public :
BTreeIndex(const std ::string & fname) : filename(fname) {
if (std ::filesystem::exists(fname)) load();
}
void insert (const std ::string & value, size_t row_idx) {
std ::unique_lock lock (mutex) ;
index[value].insert(row_idx);
save();
}
void remove (const std ::string & value, size_t row_idx) {
std ::unique_lock lock (mutex) ;
index[value].erase(row_idx);
save();
}
std ::vector <size_t > find(const std ::string & value) const {
std ::shared_lock lock (mutex) ;
auto it = index.find(value);
if (it == index.end()) return {};
return std ::vector <size_t >(it->second.begin(), it->second.end());
}
private :
void save () {
std ::ofstream file (filename, std ::ios::binary | std ::ios::trunc) ;
uint32_t size = index.size();
file.write(reinterpret_cast <const char *>(&size), sizeof (size));
for (const auto & [val, rows] : index) {
uint32_t val_len = val.size();
file.write(reinterpret_cast <const char *>(&val_len), sizeof (val_len));
file.write(val.data(), val_len);
uint32_t row_count = rows.size();
file.write(reinterpret_cast <const char *>(&row_count), sizeof (row_count));
for (auto row : rows) {
file.write(reinterpret_cast <const char *>(&row), sizeof (row));
}
}
}
void load () {
std ::ifstream file (filename, std ::ios::binary) ;
if (!file) return ;
uint32_t size;
file.read(reinterpret_cast <char *>(&size), sizeof (size));
for (uint32_t i = 0 ; i < size; ++i) {
uint32_t val_len;
file.read(reinterpret_cast <char *>(&val_len), sizeof (val_len));
std ::string val (val_len, '\0' ) ;
file.read(val.data(), val_len);
uint32_t row_count;
file.read(reinterpret_cast <char *>(&row_count), sizeof (row_count));
std ::set <size_t > rows;
for (uint32_t j = 0 ; j < row_count; ++j) {
size_t row;
file.read(reinterpret_cast <char *>(&row), sizeof (row));
rows.insert(row);
}
index[val] = rows;
}
}
};
class LRUCache
{
size_t max_size;
std ::unordered_map <size_t , std ::unique_ptr <Row>> cache;
std ::list <size_t > lru_order;
std ::shared_mutex mutex;
size_t hit_count = 0 , miss_count = 0 ;
void evict ()
{
std ::unique_lock lock (mutex) ;
if (cache.size() <= max_size) return ;
size_t oldest = lru_order.back();
lru_order.pop_back();
cache.erase(oldest);
}
public :
LRUCache(size_t size) : max_size(size) {}
Row* get (size_t row_idx)
{
std ::shared_lock lock (mutex) ;
auto it = cache.find(row_idx);
if (it != cache.end())
{
hit_count++;
lru_order.remove(row_idx);
lru_order.push_front(row_idx);
return it->second.get();
}
miss_count++;
return nullptr ;
}
void put (size_t row_idx, std ::unique_ptr <Row> row)
{
std ::unique_lock lock (mutex) ;
evict();
lru_order.push_front(row_idx);
cache[row_idx] = std ::move(row);
}
void adjust_size (double hit_rate_threshold)
{
std ::shared_lock lock (mutex) ;
double hit_rate = hit_count / static_cast <double >(hit_count + miss_count + 1 );
if (hit_rate < hit_rate_threshold) max_size += 10 ;
else if (hit_rate > hit_rate_threshold + 0.1 && max_size > 50 ) max_size -= 5 ;
}
std ::string stats ()
{
std ::shared_lock lock (mutex) ;
return "Cache Hit: " + std ::to_string(hit_count) + ", Miss: " + std ::to_string(miss_count) +
", Size: " + std ::to_string(max_size);
}
};
class HDDUltimateTable
{
public :
std ::vector <Column> columns;
size_t row_size = 0 ;
std ::vector <uint64_t > row_offsets;
std ::list <std ::pair<uint64_t , size_t >> free_list;
std ::vector <std ::string > strings;
uint32_t next_string_idx = 0 ;
LRUCache cache{ 100 };
std ::map <size_t , BTreeIndex> indexes;
private :
std ::string filename;
std ::string shard_dir;
std ::mutex file_mutex;
std ::shared_mutex transaction_mutex;
bool in_transaction = false ;
std ::ofstream log_file;
std ::vector <std ::pair<std ::string , std ::vector <char >>> log_entries;
size_t log_size = 0 ;
double hdd_rpm = 7200 ;
uint32_t get_string_idx(const std ::string & str, DataType type)
{
std ::unique_lock lock (file_mutex) ;
for (uint32_t i = 0 ; i < strings.size(); ++i)
{
if (strings[i] == str) return i;
}
strings.push_back(str);
return next_string_idx++;
}
std ::string get_shard_file (int shard_id)
{
return shard_dir + "/" + filename + "_shard" + std ::to_string(shard_id) + ".bin" ;
}
void load_row (size_t row_idx)
{
if (cache.get(row_idx)) return ;
int shard_id = row_idx % 2 ;
std ::ifstream file (get_shard_file(shard_id), std ::ios::binary) ;
if (!file) throw std ::runtime_error("ファイルオープン失敗: " + get_shard_file(shard_id));
file.seekg(row_offsets[row_idx]);
auto row = std ::make_unique<Row>(row_size);
uint32_t comp_size;
bool is_rle;
file.read(reinterpret_cast <char *>(&comp_size), sizeof (comp_size));
file.read(reinterpret_cast <char *>(&is_rle), sizeof (is_rle));
row->compressed.resize(comp_size);
file.read(row->compressed.data(), comp_size);
row->decompress(row_size);
row->use_rle = is_rle;
cache.put(row_idx, std ::move(row));
}
void prefetch_rows (size_t start_idx, size_t count)
{
std ::thread prefetch_thread ([this , start_idx, count]()
{
for (size_t i = start_idx; i < start_idx + count && i < row_offsets.size(); ++i)
{
if (!cache.get(i)) load_row(i);
}
}) ;
prefetch_thread.detach();
}
void write_file (bool checkpoint = false ) {
std ::unique_lock lock (file_mutex) ;
for (int shard_id = 0 ; shard_id < 2 ; ++shard_id) {
std ::ofstream file (get_shard_file(shard_id), std ::ios::binary | std ::ios::trunc) ;
uint32_t num_rows = row_offsets.size() / 2 + (row_offsets.size() % 2 > shard_id);
file.write(reinterpret_cast <const char *>(&num_rows), sizeof (num_rows));
for (size_t i = shard_id; i < row_offsets.size(); i += 2 ) {
file.write(reinterpret_cast <const char *>(&row_offsets[i]), sizeof (row_offsets[i]));
}
for (size_t i = shard_id; i < row_offsets.size(); i += 2 ) {
Row* row = cache.get(i);
if (row) {
if (row->dirty) {
bool use_rle = (hdd_rpm < 5400 );
row->compress(use_rle);
}
file.seekp(row_offsets[i]);
file.write(reinterpret_cast <const char *>(&row->compressed_size), sizeof (row->compressed_size));
file.write(reinterpret_cast <const char *>(&row->use_rle), sizeof (row->use_rle));
file.write(row->compressed.data(), row->compressed_size);
}
}
}
if (checkpoint && in_transaction) {
log_file.close();
log_file.open(shard_dir + "/" + filename + ".log" , std ::ios::trunc);
log_entries.clear();
log_size = 0 ;
}
}
void log_operation (const std ::string & op, size_t row_idx = -1 , const std ::vector <std ::string >& values = {}) {
if (!in_transaction) return ;
std ::stringstream log ;
log << op << " " << row_idx;
std ::vector <char > old_data;
if (op == "SET" && row_idx != -1 ) {
Row* row = cache.get(row_idx);
if (row) old_data = row->data;
}
for (const auto & v : values) log << " " << v;
std ::unique_lock lock (file_mutex) ;
log_entries.push_back({ log .str(), old_data });
log_file << log .str() << "\n" ;
log_size += log .str().size() + old_data.size();
if (log_size > 1024 * 1024 ) write_file(true );
}
public :
HDDUltimateTable(const std ::string & fname, const std ::string & dir = "." , double rpm = 7200 )
: filename(fname), shard_dir(dir), hdd_rpm(rpm) {
std ::filesystem::create_directory(shard_dir);
for (int shard_id = 0 ; shard_id < 2 ; ++shard_id) {
std ::ofstream file (get_shard_file(shard_id), std ::ios::binary | std ::ios::trunc) ;
uint32_t num_rows = 0 ;
file.write(reinterpret_cast <const char *>(&num_rows), sizeof (num_rows));
}
log_file.open(shard_dir + "/" + filename + ".log" , std ::ios::app);
if (std ::filesystem::exists(shard_dir + "/" + filename + ".log" )) recover();
}
~HDDUltimateTable() {
if (log_file.is_open()) log_file.close();
}
void CreateTable () {
std ::unique_lock lock (transaction_mutex) ;
if (in_transaction) throw std ::runtime_error("トランザクション中はCREATE TABLE不可" );
columns.clear();
row_size = 0 ;
row_offsets.clear();
free_list.clear();
strings.clear();
next_string_idx = 0 ;
write_file();
}
void AddColumn (const std ::string & name, DataType type) {
std ::unique_lock lock (transaction_mutex) ;
Column col (name, type) ;
col.offset = row_size;
row_size += col.size;
columns.push_back(col);
}
void begin_transaction () {
std ::unique_lock lock (transaction_mutex) ;
if (in_transaction) throw std ::runtime_error("既にトランザクション中" );
in_transaction = true ;
log_entries.clear();
}
void commit () {
std ::unique_lock lock (transaction_mutex) ;
if (!in_transaction) throw std ::runtime_error("トランザクションが開始されていません" );
in_transaction = false ;
write_file();
log_entries.clear();
}
void rollback () {
std ::unique_lock lock (transaction_mutex) ;
if (!in_transaction) throw std ::runtime_error("トランザクションが開始されていません" );
in_transaction = false ;
for (auto it = log_entries.rbegin(); it != log_entries.rend(); ++it) {
std ::istringstream iss (it->first) ;
std ::string op;
size_t row_idx;
iss >> op >> row_idx;
if (op == "INSERT" ) {
cache.put(row_idx, nullptr );
free_list.push_back({ row_offsets[row_idx], 0 });
}
else if (op == "DELETE" ) {
load_row(row_idx);
}
else if (op == "SET" ) {
size_t col_idx;
std ::string value;
iss >> col_idx >> value;
Row* row = cache.get(row_idx);
if (row && !it->second.empty()) row->data = it->second;
}
}
log_entries.clear();
write_file();
}
void insert (const std ::vector <std ::string >& values) {
std ::shared_lock lock (transaction_mutex) ;
auto row = std ::make_unique<Row>(row_size);
for (size_t i = 0 ; i < columns.size(); ++i) {
char * ptr = row->data.data() + columns[i].offset;
switch (columns[i].type) {
case DataType::BOOL: *ptr = std ::stoi(values[i]) != 0 ; break ;
case DataType::FLOAT: *(float *)ptr = std ::stof(values[i]); break ;
case DataType::DOUBLE: *(double *)ptr = std ::stod(values[i]); break ;
case DataType::INT: *(int32_t *)ptr = std ::stoi(values[i]); break ;
case DataType::SHORT: *(int16_t *)ptr = std ::stoi(values[i]); break ;
case DataType::BYTE: *(int8_t *)ptr = std ::stoi(values[i]); break ;
case DataType::LONG: *(int64_t *)ptr = std ::stoll(values[i]); break ;
case DataType::UINT: *(uint32_t *)ptr = std ::stoul(values[i]); break ;
case DataType::USHORT: *(uint16_t *)ptr = std ::stoul(values[i]); break ;
case DataType::UBYTE: *(uint8_t *)ptr = std ::stoul(values[i]); break ;
case DataType::ULONG: *(uint64_t *)ptr = std ::stoull(values[i]); break ;
case DataType::STRING: case DataType::WSTRING: case DataType::BLOB:
*(uint32_t *)ptr = get_string_idx(values[i], columns[i].type); break ;
case DataType::DATETIME: *(int64_t *)ptr = std ::stoll(values[i]); break ;
case DataType::DATE: *(int32_t *)ptr = std ::stoi(values[i]); break ;
case DataType::TIME: *(int32_t *)ptr = std ::stoi(values[i]); break ;
}
if (indexes.count(i)) indexes[i].insert(values[i], row_offsets.size());
}
row->dirty = true ;
uint64_t offset;
size_t row_idx;
if (!free_list.empty()) {
auto it = free_list.begin();
while (it != free_list.end() && it->second < row->compressed_size) ++it;
if (it != free_list.end()) {
offset = it->first;
row_idx = std ::distance(row_offsets.begin(), std ::find(row_offsets.begin(), row_offsets.end(), offset));
free_list.erase(it);
}
else {
offset = sizeof (uint32_t ) + row_offsets.size() * sizeof (uint64_t ) + row_offsets.size() * (row_size + sizeof (uint32_t ) + sizeof (bool ));
row_idx = row_offsets.size();
row_offsets.push_back(offset);
}
}
else {
offset = sizeof (uint32_t ) + row_offsets.size() * sizeof (uint64_t ) + row_offsets.size() * (row_size + sizeof (uint32_t ) + sizeof (bool ));
row_idx = row_offsets.size();
row_offsets.push_back(offset);
}
cache.put(row_idx, std ::move(row));
log_operation("INSERT" , row_idx, values);
}
std ::vector <size_t > search_simd(size_t col_idx, const std ::string & target, bool use_index = true ) {
std ::shared_lock lock (transaction_mutex) ;
if (use_index && indexes.count(col_idx)) {
return indexes[col_idx].find(target);
}
std ::vector <size_t > results;
if (columns[col_idx].type != DataType::INT) return results;
int32_t target_val = std ::stoi(target);
constexpr size_t SIMD_WIDTH = 8 ;
prefetch_rows(0 , std ::min<size_t >(SIMD_WIDTH * 2 , row_offsets.size()));
for (size_t i = 0 ; i < row_offsets.size(); i += SIMD_WIDTH) {
__m256i target_vec = _mm256_set1_epi32(target_val);
alignas(32 ) int32_t values[SIMD_WIDTH];
for (size_t j = 0 ; j < SIMD_WIDTH && (i + j) < row_offsets.size(); ++j) {
load_row(i + j);
Row* row = cache.get(i + j);
if (row) std ::memcpy (&values[j], row->data.data() + columns[col_idx].offset, sizeof (int32_t ));
else values[j] = 0 ;
}
__m256i data_vec = _mm256_load_si256(reinterpret_cast <const __m256i*>(values));
__m256i cmp = _mm256_cmpeq_epi32(data_vec, target_vec);
int mask = _mm256_movemask_ps(_mm256_castsi256_ps(cmp));
for (size_t j = 0 ; j < SIMD_WIDTH && (i + j) < row_offsets.size(); ++j) {
if (mask & (1 << j) && cache.get(i + j)) results.push_back(i + j);
}
}
return results;
}
void update_all (size_t col_idx, const std ::string & new_value) {
std ::shared_lock lock (transaction_mutex) ;
if (row_offsets.size() < 1000 ) {
for (size_t i = 0 ; i < row_offsets.size(); ++i) {
if (cache.get(i)) {
std ::string old_value = get_value(i, col_idx);
set_value(i, col_idx, new_value);
if (indexes.count(col_idx)) {
indexes[col_idx].remove(old_value, i);
indexes[col_idx].insert(new_value, i);
}
}
}
}
else {
std ::cout << "OpenCLで全行更新(擬似)\n" ;
for (size_t i = 0 ; i < row_offsets.size(); ++i) {
if (cache.get(i)) {
std ::string old_value = get_value(i, col_idx);
set_value(i, col_idx, new_value);
if (indexes.count(col_idx)) {
indexes[col_idx].remove(old_value, i);
indexes[col_idx].insert(new_value, i);
}
}
}
}
log_operation("UPDATE_ALL" , col_idx, { new_value });
}
std ::string get_value (size_t row_idx, size_t col_idx) {
std ::shared_lock lock (transaction_mutex) ;
load_row(row_idx);
Row* row = cache.get(row_idx);
if (!row) return "DELETED" ;
char * ptr = row->data.data() + columns[col_idx].offset;
switch (columns[col_idx].type) {
case DataType::BOOL: return std ::to_string(*(bool *)ptr);
case DataType::FLOAT: return std ::to_string(*(float *)ptr);
case DataType::DOUBLE: return std ::to_string(*(double *)ptr);
case DataType::INT: return std ::to_string(*(int32_t *)ptr);
case DataType::SHORT: return std ::to_string(*(int16_t *)ptr);
case DataType::BYTE: return std ::to_string(*(int8_t *)ptr);
case DataType::LONG: return std ::to_string(*(int64_t *)ptr);
case DataType::UINT: return std ::to_string(*(uint32_t *)ptr);
case DataType::USHORT: return std ::to_string(*(uint16_t *)ptr);
case DataType::UBYTE: return std ::to_string(*(uint8_t *)ptr);
case DataType::ULONG: return std ::to_string(*(uint64_t *)ptr);
case DataType::STRING: case DataType::WSTRING: case DataType::BLOB:
return strings[*(uint32_t *)ptr];
case DataType::DATETIME: return std ::to_string(*(int64_t *)ptr);
case DataType::DATE: return std ::to_string(*(int32_t *)ptr);
case DataType::TIME: return std ::to_string(*(int32_t *)ptr);
default : return "" ;
}
}
void set_value (size_t row_idx, size_t col_idx, const std ::string & value) {
std ::shared_lock lock (transaction_mutex) ;
load_row(row_idx);
Row* row = cache.get(row_idx);
if (!row) return ;
char * ptr = row->data.data() + columns[col_idx].offset;
switch (columns[col_idx].type) {
case DataType::BOOL: *ptr = std ::stoi(value) != 0 ; break ;
case DataType::FLOAT: *(float *)ptr = std ::stof(value); break ;
case DataType::DOUBLE: *(double *)ptr = std ::stod(value); break ;
case DataType::INT: *(int32_t *)ptr = std ::stoi(value); break ;
case DataType::SHORT: *(int16_t *)ptr = std ::stoi(value); break ;
case DataType::BYTE: *(int8_t *)ptr = std ::stoi(value); break ;
case DataType::LONG: *(int64_t *)ptr = std ::stoll(value); break ;
case DataType::UINT: *(uint32_t *)ptr = std ::stoul(value); break ;
case DataType::USHORT: *(uint16_t *)ptr = std ::stoul(value); break ;
case DataType::UBYTE: *(uint8_t *)ptr = std ::stoul(value); break ;
case DataType::ULONG: *(uint64_t *)ptr = std ::stoull(value); break ;
case DataType::STRING: case DataType::WSTRING: case DataType::BLOB:
*(uint32_t *)ptr = get_string_idx(value, columns[col_idx].type); break ;
case DataType::DATETIME: *(int64_t *)ptr = std ::stoll(value); break ;
case DataType::DATE: *(int32_t *)ptr = std ::stoi(value); break ;
case DataType::TIME: *(int32_t *)ptr = std ::stoi(value); break ;
}
row->dirty = true ;
log_operation("SET" , row_idx, { std ::to_string(col_idx), value });
}
void delete_row (size_t row_idx) {
std ::shared_lock lock (transaction_mutex) ;
load_row(row_idx);
Row* row = cache.get(row_idx);
if (!row) return ;
free_list.push_back({ row_offsets[row_idx], row->compressed_size });
for (size_t i = 0 ; i < columns.size(); ++i) {
if (indexes.count(i)) indexes[i].remove(get_value(row_idx, i), row_idx);
}
cache.put(row_idx, nullptr );
log_operation("DELETE" , row_idx);
}
void flush () { write_file(); }
void print () {
std ::shared_lock lock (transaction_mutex) ;
for (size_t i = 0 ; i < row_offsets.size(); ++i) {
for (size_t j = 0 ; j < columns.size(); ++j) {
std ::cout << columns[j].name << ": " << get_value(i, j) << " " ;
}
std ::cout << "\n" ;
}
}
void create_index (size_t col_idx) {
std ::unique_lock lock (transaction_mutex) ;
if (col_idx >= columns.size()) throw std ::runtime_error("無効な列インデックス" );
if (!indexes.count(col_idx)) {
indexes[col_idx] = BTreeIndex(shard_dir + "/" + filename + ".idx" + std ::to_string(col_idx));
for (size_t i = 0 ; i < row_offsets.size(); ++i) {
if (cache.get(i)) {
indexes[col_idx].insert(get_value(i, col_idx), i);
}
}
}
}
std ::string stats () {
return cache.stats() + ", Log Size: " + std ::to_string(log_size) + " bytes" ;
}
std ::vector <std ::pair<std ::vector <std ::string >, size_t >> group_by(size_t group_col_idx, size_t agg_col_idx, bool sum_agg = false ) {
std ::shared_lock lock (transaction_mutex) ;
std ::map <std ::string , std ::pair<int64_t , size_t >> groups;
for (size_t i = 0 ; i < row_offsets.size(); ++i) {
if (!cache.get(i)) continue ;
std ::string group_val = get_value(i, group_col_idx);
int64_t agg_val = sum_agg ? std ::stoll(get_value(i, agg_col_idx)) : 1 ;
if (groups.count(group_val)) {
if (sum_agg) groups[group_val].first += agg_val;
else groups[group_val].first++;
}
else {
groups[group_val] = { agg_val, i };
}
}
std ::vector <std ::pair<std ::vector <std ::string >, size_t >> results;
for (const auto & [val, data] : groups) {
std ::vector <std ::string > row;
for (size_t j = 0 ; j < columns.size(); ++j) {
if (j == group_col_idx) row.push_back(val);
else if (j == agg_col_idx) row.push_back(std ::to_string(data.first));
else row.push_back(get_value(data.second, j));
}
results.emplace_back(row, data.second);
}
return results;
}
void order_by (std ::vector <std ::pair<std ::vector <std ::string >, size_t >>& rows, size_t sort_col_idx, bool ascending = true ) {
std ::shared_lock lock (transaction_mutex) ;
std ::sort(rows.begin(), rows.end(),
[this , sort_col_idx, ascending](const auto & a, const auto & b) {
std ::string val_a = a.first[sort_col_idx];
std ::string val_b = b.first[sort_col_idx];
return ascending ? val_a < val_b : val_a > val_b;
});
}
private :
void recover () {
std ::ifstream log (shard_dir + "/" + filename + ".log" ) ;
std ::string line;
while (std ::getline(log , line)) {
std ::istringstream iss (line) ;
std ::string op;
size_t row_idx;
iss >> op >> row_idx;
if (op == "INSERT" ) {
load_row(row_idx);
}
else if (op == "DELETE" ) {
cache.put(row_idx, nullptr );
free_list.push_back({ row_offsets[row_idx], 0 });
}
else if (op == "SET" ) {
size_t col_idx;
std ::string value;
iss >> col_idx >> value;
set_value(row_idx, col_idx, value);
}
}
write_file();
}
void load_all_rows () {
for (size_t i = 0 ; i < row_offsets.size(); ++i) {
load_row(i);
}
}
};
copy
#include <iostream>
#include <fstream>
#include <vector>
#include <string>
#include <cstdint>
#include <stdexcept>
#include <memory>
#include <immintrin.h>
#include <zlib.h>
#include <thread>
#include <mutex>
#include <list>
#include <map>
#include <set>
#include <shared_mutex>
#include <chrono>
#include <filesystem>
#include <algorithm>
enum class DataType
{
BOOL, FLOAT, DOUBLE, INT, SHORT, BYTE, LONG, UINT, USHORT, UBYTE, ULONG,
STRING, WSTRING, BLOB, DATETIME, DATE, TIME
};
struct Column
{
std ::string name;
DataType type;
size_t size;
size_t offset;
Column(const std ::string & n, DataType t) : name(n), type(t), size(get_type_size(t)) {}
};
size_t get_type_size(DataType type)
{
switch (type)
{
case DataType::BOOL: return 1 ;
case DataType::FLOAT: return 4 ;
case DataType::DOUBLE: return 8 ;
case DataType::INT: case DataType::UINT: return 4 ;
case DataType::SHORT: case DataType::USHORT: return 2 ;
case DataType::BYTE: case DataType::UBYTE: return 1 ;
case DataType::LONG: case DataType::ULONG: return 8 ;
case DataType::STRING: case DataType::WSTRING: case DataType::BLOB: return 4 ;
case DataType::DATETIME: return 8 ;
case DataType::DATE: return 4 ;
case DataType::TIME: return 4 ;
default : return 0 ;
}
}
struct Row
{
std ::vector <char > data;
std ::vector <char > compressed;
bool dirty = false ;
bool is_compressed = false ;
size_t compressed_size = 0 ;
bool use_rle = false ;
Row(size_t size) : data(size) {}
void compress (bool prefer_rle = false )
{
use_rle = prefer_rle && (data.size() > 100 && std ::adjacent_find(data.begin(), data.end(), std ::not_equal_to<>()) == data.end());
if (use_rle)
{
compressed.clear();
for (size_t i = 0 ; i < data.size();)
{
uint8_t count = 1 ;
while (i + count < data.size() && data[i] == data[i + count] && count < 255 ) count++;
compressed.push_back(data[i]);
compressed.push_back(count);
i += count;
}
}
else
{
uLongf dest_len = compressBound(data.size());
compressed.resize(dest_len);
compress(reinterpret_cast <Bytef*>(compressed.data()), &dest_len,
reinterpret_cast <const Bytef*>(data.data()), data.size());
compressed.resize(dest_len);
}
compressed_size = compressed.size();
is_compressed = true ;
}
void decompress (size_t orig_size)
{
data.resize(orig_size);
if (use_rle) {
size_t pos = 0 ;
for (size_t i = 0 ; i < compressed.size() && pos < orig_size; i += 2 ) {
char value = compressed[i];
uint8_t count = compressed[i + 1 ];
for (uint8_t j = 0 ; j < count && pos < orig_size; ++j) {
data[pos++] = value;
}
}
}
else {
uLongf dest_len = orig_size;
uncompress(reinterpret_cast <Bytef*>(data.data()), &dest_len,
reinterpret_cast <const Bytef*>(compressed.data()), compressed.size());
}
is_compressed = false ;
}
};
class BTreeIndex {
std ::map <std ::string , std ::set <size_t >> index;
std ::shared_mutex mutex;
std ::string filename;
public :
BTreeIndex(const std ::string & fname) : filename(fname) {
if (std ::filesystem::exists(fname)) load();
}
void insert (const std ::string & value, size_t row_idx) {
std ::unique_lock lock (mutex) ;
index[value].insert(row_idx);
save();
}
void remove (const std ::string & value, size_t row_idx) {
std ::unique_lock lock (mutex) ;
index[value].erase(row_idx);
save();
}
std ::vector <size_t > find(const std ::string & value) const {
std ::shared_lock lock (mutex) ;
auto it = index.find(value);
if (it == index.end()) return {};
return std ::vector <size_t >(it->second.begin(), it->second.end());
}
private :
void save () {
std ::ofstream file (filename, std ::ios::binary | std ::ios::trunc) ;
uint32_t size = index.size();
file.write(reinterpret_cast <const char *>(&size), sizeof (size));
for (const auto & [val, rows] : index) {
uint32_t val_len = val.size();
file.write(reinterpret_cast <const char *>(&val_len), sizeof (val_len));
file.write(val.data(), val_len);
uint32_t row_count = rows.size();
file.write(reinterpret_cast <const char *>(&row_count), sizeof (row_count));
for (auto row : rows) {
file.write(reinterpret_cast <const char *>(&row), sizeof (row));
}
}
}
void load () {
std ::ifstream file (filename, std ::ios::binary) ;
if (!file) return ;
uint32_t size;
file.read(reinterpret_cast <char *>(&size), sizeof (size));
for (uint32_t i = 0 ; i < size; ++i) {
uint32_t val_len;
file.read(reinterpret_cast <char *>(&val_len), sizeof (val_len));
std ::string val (val_len, '\0' ) ;
file.read(val.data(), val_len);
uint32_t row_count;
file.read(reinterpret_cast <char *>(&row_count), sizeof (row_count));
std ::set <size_t > rows;
for (uint32_t j = 0 ; j < row_count; ++j) {
size_t row;
file.read(reinterpret_cast <char *>(&row), sizeof (row));
rows.insert(row);
}
index[val] = rows;
}
}
};
class LRUCache
{
size_t max_size;
std ::unordered_map <size_t , std ::unique_ptr <Row>> cache;
std ::list <size_t > lru_order;
std ::shared_mutex mutex;
size_t hit_count = 0 , miss_count = 0 ;
void evict ()
{
std ::unique_lock lock (mutex) ;
if (cache.size() <= max_size) return ;
size_t oldest = lru_order.back();
lru_order.pop_back();
cache.erase(oldest);
}
public :
LRUCache(size_t size) : max_size(size) {}
Row* get (size_t row_idx)
{
std ::shared_lock lock (mutex) ;
auto it = cache.find(row_idx);
if (it != cache.end())
{
hit_count++;
lru_order.remove(row_idx);
lru_order.push_front(row_idx);
return it->second.get();
}
miss_count++;
return nullptr ;
}
void put (size_t row_idx, std ::unique_ptr <Row> row)
{
std ::unique_lock lock (mutex) ;
evict();
lru_order.push_front(row_idx);
cache[row_idx] = std ::move(row);
}
void adjust_size (double hit_rate_threshold)
{
std ::shared_lock lock (mutex) ;
double hit_rate = hit_count / static_cast <double >(hit_count + miss_count + 1 );
if (hit_rate < hit_rate_threshold) max_size += 10 ;
else if (hit_rate > hit_rate_threshold + 0.1 && max_size > 50 ) max_size -= 5 ;
}
std ::string stats ()
{
std ::shared_lock lock (mutex) ;
return "Cache Hit: " + std ::to_string(hit_count) + ", Miss: " + std ::to_string(miss_count) +
", Size: " + std ::to_string(max_size);
}
};
class HDDUltimateTable
{
public :
std ::vector <Column> columns;
size_t row_size = 0 ;
std ::vector <uint64_t > row_offsets;
std ::list <std ::pair<uint64_t , size_t >> free_list;
std ::vector <std ::string > strings;
uint32_t next_string_idx = 0 ;
LRUCache cache{ 100 };
std ::map <size_t , BTreeIndex> indexes;
private :
std ::string filename;
std ::string shard_dir;
std ::mutex file_mutex;
std ::shared_mutex transaction_mutex;
bool in_transaction = false ;
std ::ofstream log_file;
std ::vector <std ::pair<std ::string , std ::vector <char >>> log_entries;
size_t log_size = 0 ;
double hdd_rpm = 7200 ;
uint32_t get_string_idx(const std ::string & str, DataType type)
{
std ::unique_lock lock (file_mutex) ;
for (uint32_t i = 0 ; i < strings.size(); ++i)
{
if (strings[i] == str) return i;
}
strings.push_back(str);
return next_string_idx++;
}
std ::string get_shard_file (int shard_id)
{
return shard_dir + "/" + filename + "_shard" + std ::to_string(shard_id) + ".bin" ;
}
void load_row (size_t row_idx)
{
if (cache.get(row_idx)) return ;
int shard_id = row_idx % 2 ;
std ::ifstream file (get_shard_file(shard_id), std ::ios::binary) ;
if (!file) throw std ::runtime_error("ファイルオープン失敗: " + get_shard_file(shard_id));
file.seekg(row_offsets[row_idx]);
auto row = std ::make_unique<Row>(row_size);
uint32_t comp_size;
bool is_rle;
file.read(reinterpret_cast <char *>(&comp_size), sizeof (comp_size));
file.read(reinterpret_cast <char *>(&is_rle), sizeof (is_rle));
row->compressed.resize(comp_size);
file.read(row->compressed.data(), comp_size);
row->decompress(row_size);
row->use_rle = is_rle;
cache.put(row_idx, std ::move(row));
}
void prefetch_rows (size_t start_idx, size_t count)
{
std ::thread prefetch_thread ([this , start_idx, count]()
{
for (size_t i = start_idx; i < start_idx + count && i < row_offsets.size(); ++i)
{
if (!cache.get(i)) load_row(i);
}
}) ;
prefetch_thread.detach();
}
void write_file (bool checkpoint = false ) {
std ::unique_lock lock (file_mutex) ;
for (int shard_id = 0 ; shard_id < 2 ; ++shard_id) {
std ::ofstream file (get_shard_file(shard_id), std ::ios::binary | std ::ios::trunc) ;
uint32_t num_rows = row_offsets.size() / 2 + (row_offsets.size() % 2 > shard_id);
file.write(reinterpret_cast <const char *>(&num_rows), sizeof (num_rows));
for (size_t i = shard_id; i < row_offsets.size(); i += 2 ) {
file.write(reinterpret_cast <const char *>(&row_offsets[i]), sizeof (row_offsets[i]));
}
for (size_t i = shard_id; i < row_offsets.size(); i += 2 ) {
Row* row = cache.get(i);
if (row) {
if (row->dirty) {
bool use_rle = (hdd_rpm < 5400 );
row->compress(use_rle);
}
file.seekp(row_offsets[i]);
file.write(reinterpret_cast <const char *>(&row->compressed_size), sizeof (row->compressed_size));
file.write(reinterpret_cast <const char *>(&row->use_rle), sizeof (row->use_rle));
file.write(row->compressed.data(), row->compressed_size);
}
}
}
if (checkpoint && in_transaction) {
log_file.close();
log_file.open(shard_dir + "/" + filename + ".log" , std ::ios::trunc);
log_entries.clear();
log_size = 0 ;
}
}
void log_operation (const std ::string & op, size_t row_idx = -1 , const std ::vector <std ::string >& values = {}) {
if (!in_transaction) return ;
std ::stringstream log ;
log << op << " " << row_idx;
std ::vector <char > old_data;
if (op == "SET" && row_idx != -1 ) {
Row* row = cache.get(row_idx);
if (row) old_data = row->data;
}
for (const auto & v : values) log << " " << v;
std ::unique_lock lock (file_mutex) ;
log_entries.push_back({ log .str(), old_data });
log_file << log .str() << "\n" ;
log_size += log .str().size() + old_data.size();
if (log_size > 1024 * 1024 ) write_file(true );
}
public :
HDDUltimateTable(const std ::string & fname, const std ::string & dir = "." , double rpm = 7200 )
: filename(fname), shard_dir(dir), hdd_rpm(rpm) {
std ::filesystem::create_directory(shard_dir);
for (int shard_id = 0 ; shard_id < 2 ; ++shard_id) {
std ::ofstream file (get_shard_file(shard_id), std ::ios::binary | std ::ios::trunc) ;
uint32_t num_rows = 0 ;
file.write(reinterpret_cast <const char *>(&num_rows), sizeof (num_rows));
}
log_file.open(shard_dir + "/" + filename + ".log" , std ::ios::app);
if (std ::filesystem::exists(shard_dir + "/" + filename + ".log" )) recover();
}
~HDDUltimateTable() {
if (log_file.is_open()) log_file.close();
}
void CreateTable () {
std ::unique_lock lock (transaction_mutex) ;
if (in_transaction) throw std ::runtime_error("トランザクション中はCREATE TABLE不可" );
columns.clear();
row_size = 0 ;
row_offsets.clear();
free_list.clear();
strings.clear();
next_string_idx = 0 ;
write_file();
}
void AddColumn (const std ::string & name, DataType type) {
std ::unique_lock lock (transaction_mutex) ;
Column col (name, type) ;
col.offset = row_size;
row_size += col.size;
columns.push_back(col);
}
void begin_transaction () {
std ::unique_lock lock (transaction_mutex) ;
if (in_transaction) throw std ::runtime_error("既にトランザクション中" );
in_transaction = true ;
log_entries.clear();
}
void commit () {
std ::unique_lock lock (transaction_mutex) ;
if (!in_transaction) throw std ::runtime_error("トランザクションが開始されていません" );
in_transaction = false ;
write_file();
log_entries.clear();
}
void rollback () {
std ::unique_lock lock (transaction_mutex) ;
if (!in_transaction) throw std ::runtime_error("トランザクションが開始されていません" );
in_transaction = false ;
for (auto it = log_entries.rbegin(); it != log_entries.rend(); ++it) {
std ::istringstream iss (it->first) ;
std ::string op;
size_t row_idx;
iss >> op >> row_idx;
if (op == "INSERT" ) {
cache.put(row_idx, nullptr );
free_list.push_back({ row_offsets[row_idx], 0 });
}
else if (op == "DELETE" ) {
load_row(row_idx);
}
else if (op == "SET" ) {
size_t col_idx;
std ::string value;
iss >> col_idx >> value;
Row* row = cache.get(row_idx);
if (row && !it->second.empty()) row->data = it->second;
}
}
log_entries.clear();
write_file();
}
void insert (const std ::vector <std ::string >& values) {
std ::shared_lock lock (transaction_mutex) ;
auto row = std ::make_unique<Row>(row_size);
for (size_t i = 0 ; i < columns.size(); ++i) {
char * ptr = row->data.data() + columns[i].offset;
switch (columns[i].type) {
case DataType::BOOL: *ptr = std ::stoi(values[i]) != 0 ; break ;
case DataType::FLOAT: *(float *)ptr = std ::stof(values[i]); break ;
case DataType::DOUBLE: *(double *)ptr = std ::stod(values[i]); break ;
case DataType::INT: *(int32_t *)ptr = std ::stoi(values[i]); break ;
case DataType::SHORT: *(int16_t *)ptr = std ::stoi(values[i]); break ;
case DataType::BYTE: *(int8_t *)ptr = std ::stoi(values[i]); break ;
case DataType::LONG: *(int64_t *)ptr = std ::stoll(values[i]); break ;
case DataType::UINT: *(uint32_t *)ptr = std ::stoul(values[i]); break ;
case DataType::USHORT: *(uint16_t *)ptr = std ::stoul(values[i]); break ;
case DataType::UBYTE: *(uint8_t *)ptr = std ::stoul(values[i]); break ;
case DataType::ULONG: *(uint64_t *)ptr = std ::stoull(values[i]); break ;
case DataType::STRING: case DataType::WSTRING: case DataType::BLOB:
*(uint32_t *)ptr = get_string_idx(values[i], columns[i].type); break ;
case DataType::DATETIME: *(int64_t *)ptr = std ::stoll(values[i]); break ;
case DataType::DATE: *(int32_t *)ptr = std ::stoi(values[i]); break ;
case DataType::TIME: *(int32_t *)ptr = std ::stoi(values[i]); break ;
}
if (indexes.count(i)) indexes[i].insert(values[i], row_offsets.size());
}
row->dirty = true ;
uint64_t offset;
size_t row_idx;
if (!free_list.empty()) {
auto it = free_list.begin();
while (it != free_list.end() && it->second < row->compressed_size) ++it;
if (it != free_list.end()) {
offset = it->first;
row_idx = std ::distance(row_offsets.begin(), std ::find(row_offsets.begin(), row_offsets.end(), offset));
free_list.erase(it);
}
else {
offset = sizeof (uint32_t ) + row_offsets.size() * sizeof (uint64_t ) + row_offsets.size() * (row_size + sizeof (uint32_t ) + sizeof (bool ));
row_idx = row_offsets.size();
row_offsets.push_back(offset);
}
}
else {
offset = sizeof (uint32_t ) + row_offsets.size() * sizeof (uint64_t ) + row_offsets.size() * (row_size + sizeof (uint32_t ) + sizeof (bool ));
row_idx = row_offsets.size();
row_offsets.push_back(offset);
}
cache.put(row_idx, std ::move(row));
log_operation("INSERT" , row_idx, values);
}
std ::vector <size_t > search_simd(size_t col_idx, const std ::string & target, bool use_index = true ) {
std ::shared_lock lock (transaction_mutex) ;
if (use_index && indexes.count(col_idx)) {
return indexes[col_idx].find(target);
}
std ::vector <size_t > results;
if (columns[col_idx].type != DataType::INT) return results;
int32_t target_val = std ::stoi(target);
constexpr size_t SIMD_WIDTH = 8 ;
prefetch_rows(0 , std ::min<size_t >(SIMD_WIDTH * 2 , row_offsets.size()));
for (size_t i = 0 ; i < row_offsets.size(); i += SIMD_WIDTH) {
__m256i target_vec = _mm256_set1_epi32(target_val);
alignas(32 ) int32_t values[SIMD_WIDTH];
for (size_t j = 0 ; j < SIMD_WIDTH && (i + j) < row_offsets.size(); ++j) {
load_row(i + j);
Row* row = cache.get(i + j);
if (row) std ::memcpy (&values[j], row->data.data() + columns[col_idx].offset, sizeof (int32_t ));
else values[j] = 0 ;
}
__m256i data_vec = _mm256_load_si256(reinterpret_cast <const __m256i*>(values));
__m256i cmp = _mm256_cmpeq_epi32(data_vec, target_vec);
int mask = _mm256_movemask_ps(_mm256_castsi256_ps(cmp));
for (size_t j = 0 ; j < SIMD_WIDTH && (i + j) < row_offsets.size(); ++j) {
if (mask & (1 << j) && cache.get(i + j)) results.push_back(i + j);
}
}
return results;
}
void update_all (size_t col_idx, const std ::string & new_value) {
std ::shared_lock lock (transaction_mutex) ;
if (row_offsets.size() < 1000 ) {
for (size_t i = 0 ; i < row_offsets.size(); ++i) {
if (cache.get(i)) {
std ::string old_value = get_value(i, col_idx);
set_value(i, col_idx, new_value);
if (indexes.count(col_idx)) {
indexes[col_idx].remove(old_value, i);
indexes[col_idx].insert(new_value, i);
}
}
}
}
else {
std ::cout << "OpenCLで全行更新(擬似)\n" ;
for (size_t i = 0 ; i < row_offsets.size(); ++i) {
if (cache.get(i)) {
std ::string old_value = get_value(i, col_idx);
set_value(i, col_idx, new_value);
if (indexes.count(col_idx)) {
indexes[col_idx].remove(old_value, i);
indexes[col_idx].insert(new_value, i);
}
}
}
}
log_operation("UPDATE_ALL" , col_idx, { new_value });
}
std ::string get_value (size_t row_idx, size_t col_idx) {
std ::shared_lock lock (transaction_mutex) ;
load_row(row_idx);
Row* row = cache.get(row_idx);
if (!row) return "DELETED" ;
char * ptr = row->data.data() + columns[col_idx].offset;
switch (columns[col_idx].type) {
case DataType::BOOL: return std ::to_string(*(bool *)ptr);
case DataType::FLOAT: return std ::to_string(*(float *)ptr);
case DataType::DOUBLE: return std ::to_string(*(double *)ptr);
case DataType::INT: return std ::to_string(*(int32_t *)ptr);
case DataType::SHORT: return std ::to_string(*(int16_t *)ptr);
case DataType::BYTE: return std ::to_string(*(int8_t *)ptr);
case DataType::LONG: return std ::to_string(*(int64_t *)ptr);
case DataType::UINT: return std ::to_string(*(uint32_t *)ptr);
case DataType::USHORT: return std ::to_string(*(uint16_t *)ptr);
case DataType::UBYTE: return std ::to_string(*(uint8_t *)ptr);
case DataType::ULONG: return std ::to_string(*(uint64_t *)ptr);
case DataType::STRING: case DataType::WSTRING: case DataType::BLOB:
return strings[*(uint32_t *)ptr];
case DataType::DATETIME: return std ::to_string(*(int64_t *)ptr);
case DataType::DATE: return std ::to_string(*(int32_t *)ptr);
case DataType::TIME: return std ::to_string(*(int32_t *)ptr);
default : return "" ;
}
}
void set_value (size_t row_idx, size_t col_idx, const std ::string & value) {
std ::shared_lock lock (transaction_mutex) ;
load_row(row_idx);
Row* row = cache.get(row_idx);
if (!row) return ;
char * ptr = row->data.data() + columns[col_idx].offset;
switch (columns[col_idx].type) {
case DataType::BOOL: *ptr = std ::stoi(value) != 0 ; break ;
case DataType::FLOAT: *(float *)ptr = std ::stof(value); break ;
case DataType::DOUBLE: *(double *)ptr = std ::stod(value); break ;
case DataType::INT: *(int32_t *)ptr = std ::stoi(value); break ;
case DataType::SHORT: *(int16_t *)ptr = std ::stoi(value); break ;
case DataType::BYTE: *(int8_t *)ptr = std ::stoi(value); break ;
case DataType::LONG: *(int64_t *)ptr = std ::stoll(value); break ;
case DataType::UINT: *(uint32_t *)ptr = std ::stoul(value); break ;
case DataType::USHORT: *(uint16_t *)ptr = std ::stoul(value); break ;
case DataType::UBYTE: *(uint8_t *)ptr = std ::stoul(value); break ;
case DataType::ULONG: *(uint64_t *)ptr = std ::stoull(value); break ;
case DataType::STRING: case DataType::WSTRING: case DataType::BLOB:
*(uint32_t *)ptr = get_string_idx(value, columns[col_idx].type); break ;
case DataType::DATETIME: *(int64_t *)ptr = std ::stoll(value); break ;
case DataType::DATE: *(int32_t *)ptr = std ::stoi(value); break ;
case DataType::TIME: *(int32_t *)ptr = std ::stoi(value); break ;
}
row->dirty = true ;
log_operation("SET" , row_idx, { std ::to_string(col_idx), value });
}
void delete_row (size_t row_idx) {
std ::shared_lock lock (transaction_mutex) ;
load_row(row_idx);
Row* row = cache.get(row_idx);
if (!row) return ;
free_list.push_back({ row_offsets[row_idx], row->compressed_size });
for (size_t i = 0 ; i < columns.size(); ++i) {
if (indexes.count(i)) indexes[i].remove(get_value(row_idx, i), row_idx);
}
cache.put(row_idx, nullptr );
log_operation("DELETE" , row_idx);
}
void flush () { write_file(); }
void print () {
std ::shared_lock lock (transaction_mutex) ;
for (size_t i = 0 ; i < row_offsets.size(); ++i) {
for (size_t j = 0 ; j < columns.size(); ++j) {
std ::cout << columns[j].name << ": " << get_value(i, j) << " " ;
}
std ::cout << "\n" ;
}
}
void create_index (size_t col_idx) {
std ::unique_lock lock (transaction_mutex) ;
if (col_idx >= columns.size()) throw std ::runtime_error("無効な列インデックス" );
if (!indexes.count(col_idx)) {
indexes[col_idx] = BTreeIndex(shard_dir + "/" + filename + ".idx" + std ::to_string(col_idx));
for (size_t i = 0 ; i < row_offsets.size(); ++i) {
if (cache.get(i)) {
indexes[col_idx].insert(get_value(i, col_idx), i);
}
}
}
}
std ::string stats () {
return cache.stats() + ", Log Size: " + std ::to_string(log_size) + " bytes" ;
}
std ::vector <std ::pair<std ::vector <std ::string >, size_t >> group_by(size_t group_col_idx, size_t agg_col_idx, bool sum_agg = false ) {
std ::shared_lock lock (transaction_mutex) ;
std ::map <std ::string , std ::pair<int64_t , size_t >> groups;
for (size_t i = 0 ; i < row_offsets.size(); ++i) {
if (!cache.get(i)) continue ;
std ::string group_val = get_value(i, group_col_idx);
int64_t agg_val = sum_agg ? std ::stoll(get_value(i, agg_col_idx)) : 1 ;
if (groups.count(group_val)) {
if (sum_agg) groups[group_val].first += agg_val;
else groups[group_val].first++;
}
else {
groups[group_val] = { agg_val, i };
}
}
std ::vector <std ::pair<std ::vector <std ::string >, size_t >> results;
for (const auto & [val, data] : groups) {
std ::vector <std ::string > row;
for (size_t j = 0 ; j < columns.size(); ++j) {
if (j == group_col_idx) row.push_back(val);
else if (j == agg_col_idx) row.push_back(std ::to_string(data.first));
else row.push_back(get_value(data.second, j));
}
results.emplace_back(row, data.second);
}
return results;
}
void order_by (std ::vector <std ::pair<std ::vector <std ::string >, size_t >>& rows, size_t sort_col_idx, bool ascending = true ) {
std ::shared_lock lock (transaction_mutex) ;
std ::sort(rows.begin(), rows.end(),
[this , sort_col_idx, ascending](const auto & a, const auto & b) {
std ::string val_a = a.first[sort_col_idx];
std ::string val_b = b.first[sort_col_idx];
return ascending ? val_a < val_b : val_a > val_b;
});
}
private :
void recover () {
std ::ifstream log (shard_dir + "/" + filename + ".log" ) ;
std ::string line;
while (std ::getline(log , line)) {
std ::istringstream iss (line) ;
std ::string op;
size_t row_idx;
iss >> op >> row_idx;
if (op == "INSERT" ) {
load_row(row_idx);
}
else if (op == "DELETE" ) {
cache.put(row_idx, nullptr );
free_list.push_back({ row_offsets[row_idx], 0 });
}
else if (op == "SET" ) {
size_t col_idx;
std ::string value;
iss >> col_idx >> value;
set_value(row_idx, col_idx, value);
}
}
write_file();
}
void load_all_rows () {
for (size_t i = 0 ; i < row_offsets.size(); ++i) {
load_row(i);
}
}
};
copy
#include <vector>
#include <map>
#include <string>
#include <iostream>
#include <vector>
#include <string>
#include <unordered_map>
#include <memory>
#include "HddTable.h"
class SQLProcessor
{
private :
std ::map <std ::string , HddTable> tables;
std ::mutex db_mutex;
std ::vector <std ::string > tokenize(const std ::string & sql, size_t & error_pos)
{
std ::vector <std ::string > tokens;
std ::stringstream ss (sql) ;
std ::string token;
size_t pos = 0 ;
while (ss >> token) {
tokens.push_back(token);
pos += token.length() + 1 ;
}
error_pos = pos;
return tokens;
}
void create_table (const std ::vector <std ::string >& tokens, size_t error_pos) {
std ::unique_lock lock (db_mutex) ;
if (tokens.size() < 4 || tokens[0 ] != "CREATE" || tokens[1 ] != "TABLE" ) {
throw std ::runtime_error("無効なCREATE TABLE文 at " + std ::to_string(error_pos));
}
std ::string table_name = tokens[2 ];
if (tables.count(table_name)) {
throw std ::runtime_error("テーブルが既に存在します: " + table_name);
}
tables[table_name] = HddTable(table_name + ".bin" );
tables[table_name].CreateTable();
size_t i = 3 ;
if (tokens[i] != "(" ) throw std ::runtime_error("括弧がありません at " + std ::to_string(error_pos + i));
i++;
while (i < tokens.size() && tokens[i] != ")" ) {
std ::string col_name = tokens[i++];
std ::string type_str = tokens[i++];
DataType type;
if (type_str == "INT" ) type = DataType::INT;
else if (type_str == "FLOAT" ) type = DataType::FLOAT;
else if (type_str == "STRING" ) type = DataType::STRING;
else if (type_str == "BOOL" ) type = DataType::BOOL;
else throw std ::runtime_error("未知のデータ型: " + type_str + " at " + std ::to_string(error_pos + i));
tables[table_name].AddColumn(col_name, type);
if (i < tokens.size() && tokens[i] == "," ) i++;
}
}
void delete_table (const std ::vector <std ::string >& tokens, size_t error_pos) {
std ::unique_lock lock (db_mutex) ;
if (tokens.size() != 3 || tokens[0 ] != "DELETE" || tokens[1 ] != "TABLE" ) {
throw std ::runtime_error("無効なDELETE TABLE文 at " + std ::to_string(error_pos));
}
std ::string table_name = tokens[2 ];
if (!tables.count(table_name)) {
throw std ::runtime_error("テーブルが存在しません: " + table_name);
}
tables.erase(table_name);
std ::remove((table_name + ".bin" ).c_str());
std ::remove((table_name + ".bin.log" ).c_str());
}
void select (const std ::vector <std ::string >& tokens, size_t error_pos) {
std ::shared_lock lock (db_mutex) ;
if (tokens.size() < 4 || tokens[0 ] != "SELECT" || tokens[2 ] != "FROM" ) {
throw std ::runtime_error("無効なSELECT文 at " + std ::to_string(error_pos));
}
std ::string table_name = tokens[3 ];
if (!tables.count(table_name)) {
throw std ::runtime_error("テーブルが存在しません: " + table_name);
}
HddTable& table = tables[table_name];
size_t join_idx = std ::find(tokens.begin(), tokens.end(), "JOIN" ) - tokens.begin();
if (join_idx < tokens.size()) {
bool is_left = (tokens[join_idx - 1 ] == "LEFT" );
std ::string join_table = tokens[join_idx + 1 ];
if (!tables.count(join_table)) {
throw std ::runtime_error("結合テーブルが存在しません: " + join_table);
}
size_t on_idx = std ::find(tokens.begin(), tokens.end(), "ON" ) - tokens.begin();
if (on_idx >= tokens.size() || tokens.size() < on_idx + 4 ) {
throw std ::runtime_error("ON条件が不正 at " + std ::to_string(error_pos + on_idx));
}
std ::string on_col1 = tokens[on_idx + 1 ];
std ::string on_col2 = tokens[on_idx + 3 ];
size_t col1_idx = -1 , col2_idx = -1 ;
for (size_t i = 0 ; i < table.columns.size(); ++i) {
if (table.columns[i].name == on_col1) col1_idx = i;
}
HddTable& join_table_ref = tables[join_table];
for (size_t i = 0 ; i < join_table_ref.columns.size(); ++i) {
if (join_table_ref.columns[i].name == on_col2) col2_idx = i;
}
if (col1_idx == -1 || col2_idx == -1 ) {
throw std ::runtime_error("結合列が見つかりません" );
}
for (size_t i = 0 ; i < table.row_offsets.size(); ++i) {
if (!table.cache.get(i) && !is_left) continue ;
std ::string val1 = table.get_value(i, col1_idx);
auto matches = join_table_ref.search_simd(col2_idx, val1);
if (matches.empty() && is_left) {
for (size_t k = 0 ; k < table.columns.size(); ++k) {
std ::cout << table.columns[k].name << ": " << table.get_value(i, k) << " " ;
}
for (size_t k = 0 ; k < join_table_ref.columns.size(); ++k) {
std ::cout << join_table_ref.columns[k].name << ": NULL " ;
}
std ::cout << "\n" ;
}
for (auto j : matches) {
for (size_t k = 0 ; k < table.columns.size(); ++k) {
std ::cout << table.columns[k].name << ": " << table.get_value(i, k) << " " ;
}
for (size_t k = 0 ; k < join_table_ref.columns.size(); ++k) {
std ::cout << join_table_ref.columns[k].name << ": " << join_table_ref.get_value(j, k) << " " ;
}
std ::cout << "\n" ;
}
}
}
else {
size_t where_idx = std ::find(tokens.begin(), tokens.end(), "WHERE" ) - tokens.begin();
if (where_idx < tokens.size()) {
if (tokens.size() < where_idx + 4 || tokens[where_idx + 2 ] != "=" ) {
throw std ::runtime_error("無効なWHERE条件 at " + std ::to_string(error_pos + where_idx));
}
std ::string col_name = tokens[where_idx + 1 ];
std ::string value = tokens[where_idx + 3 ];
size_t col_idx = -1 ;
for (size_t i = 0 ; i < table.columns.size(); ++i) {
if (table.columns[i].name == col_name) {
col_idx = i;
break ;
}
}
if (col_idx == -1 ) throw std ::runtime_error("列が見つかりません: " + col_name);
auto results = table.search_simd(col_idx, value);
for (auto idx : results) {
for (size_t j = 0 ; j < table.columns.size(); ++j) {
std ::cout << table.columns[j].name << ": " << table.get_value(idx, j) << " " ;
}
std ::cout << "\n" ;
}
}
else {
table.print();
}
}
}
void insert (const std ::vector <std ::string >& tokens, size_t error_pos) {
std ::shared_lock lock (db_mutex) ;
if (tokens.size() < 5 || tokens[0 ] != "INSERT" || tokens[1 ] != "INTO" || tokens[3 ] != "VALUES" ) {
throw std ::runtime_error("無効なINSERT文 at " + std ::to_string(error_pos));
}
std ::string table_name = tokens[2 ];
if (!tables.count(table_name)) {
throw std ::runtime_error("テーブルが存在しません: " + table_name);
}
HddTable& table = tables[table_name];
std ::vector <std ::string > values;
size_t i = 4 ;
if (tokens[i] != "(" ) throw std ::runtime_error("括弧がありません at " + std ::to_string(error_pos + i));
i++;
while (i < tokens.size() && tokens[i] != ")" ) {
values.push_back(tokens[i]);
i++;
if (i < tokens.size() && tokens[i] == "," ) i++;
}
table.insert(values);
}
void update (const std ::vector <std ::string >& tokens, size_t error_pos) {
std ::shared_lock lock (db_mutex) ;
if (tokens.size() < 6 || tokens[0 ] != "UPDATE" || tokens[2 ] != "SET" ) {
throw std ::runtime_error("無効なUPDATE文 at " + std ::to_string(error_pos));
}
std ::string table_name = tokens[1 ];
if (!tables.count(table_name)) {
throw std ::runtime_error("テーブルが存在しません: " + table_name);
}
HddTable& table = tables[table_name];
std ::string col_name = tokens[3 ];
if (tokens[4 ] != "=" ) throw std ::runtime_error("SET構文エラー at " + std ::to_string(error_pos + 4 ));
std ::string value = tokens[5 ];
size_t col_idx = -1 ;
for (size_t i = 0 ; i < table.columns.size(); ++i) {
if (table.columns[i].name == col_name) {
col_idx = i;
break ;
}
}
if (col_idx == -1 ) throw std ::runtime_error("列が見つかりません: " + col_name);
size_t where_idx = std ::find(tokens.begin(), tokens.end(), "WHERE" ) - tokens.begin();
if (where_idx < tokens.size()) {
std ::string where_col = tokens[where_idx + 1 ];
std ::string where_value = tokens[where_idx + 3 ];
size_t where_col_idx = -1 ;
for (size_t i = 0 ; i < table.columns.size(); ++i) {
if (table.columns[i].name == where_col) {
where_col_idx = i;
break ;
}
}
if (where_col_idx == -1 ) throw std ::runtime_error("WHERE列が見つかりません: " + where_col);
auto rows = table.search_simd(where_col_idx, where_value);
for (auto row_idx : rows) {
table.set_value(row_idx, col_idx, value);
}
}
else {
table.update_all(col_idx, value);
}
table.flush();
}
void delete_rows (const std ::vector <std ::string >& tokens, size_t error_pos) {
std ::shared_lock lock (db_mutex) ;
if (tokens.size() < 3 || tokens[0 ] != "DELETE" || tokens[1 ] != "FROM" ) {
throw std ::runtime_error("無効なDELETE文 at " + std ::to_string(error_pos));
}
std ::string table_name = tokens[2 ];
if (!tables.count(table_name)) {
throw std ::runtime_error("テーブルが存在しません: " + table_name);
}
HddTable& table = tables[table_name];
size_t where_idx = std ::find(tokens.begin(), tokens.end(), "WHERE" ) - tokens.begin();
if (where_idx < tokens.size()) {
std ::string col_name = tokens[where_idx + 1 ];
std ::string value = tokens[where_idx + 3 ];
size_t col_idx = -1 ;
for (size_t i = 0 ; i < table.columns.size(); ++i) {
if (table.columns[i].name == col_name) {
col_idx = i;
break ;
}
}
if (col_idx == -1 ) throw std ::runtime_error("列が見つかりません: " + col_name);
auto rows = table.search_simd(col_idx, value);
for (auto row_idx : rows) {
table.delete_row(row_idx);
}
}
else {
for (size_t i = 0 ; i < table.row_offsets.size(); ++i) {
table.delete_row(i);
}
}
table.flush();
}
void create_index (const std ::vector <std ::string >& tokens, size_t error_pos) {
std ::shared_lock lock (db_mutex) ;
if (tokens.size() < 5 || tokens[0 ] != "CREATE" || tokens[1 ] != "INDEX" || tokens[3 ] != "ON" ) {
throw std ::runtime_error("無効なCREATE INDEX文 at " + std ::to_string(error_pos));
}
std ::string table_name = tokens[4 ];
if (!tables.count(table_name)) {
throw std ::runtime_error("テーブルが存在しません: " + table_name);
}
std ::string col_name = tokens[5 ].substr(1 , tokens[5 ].size() - 2 );
HddTable& table = tables[table_name];
size_t col_idx = -1 ;
for (size_t i = 0 ; i < table.columns.size(); ++i) {
if (table.columns[i].name == col_name) {
col_idx = i;
break ;
}
}
if (col_idx == -1 ) throw std ::runtime_error("列が見つかりません: " + col_name);
table.create_index(col_idx);
}
public :
void execute (const std ::string & sql) {
size_t error_pos;
std ::vector <std ::string > tokens = tokenize(sql, error_pos);
if (tokens.empty()) return ;
try {
if (tokens[0 ] == "CREATE" && tokens[1 ] == "TABLE" ) {
create_table(tokens, error_pos);
}
else if (tokens[0 ] == "DELETE" && tokens[1 ] == "TABLE" ) {
delete_table(tokens, error_pos);
}
else if (tokens[0 ] == "SELECT" ) {
select(tokens, error_pos);
}
else if (tokens[0 ] == "INSERT" ) {
insert(tokens, error_pos);
}
else if (tokens[0 ] == "UPDATE" ) {
update(tokens, error_pos);
}
else if (tokens[0 ] == "DELETE" ) {
delete_rows(tokens, error_pos);
}
else if (tokens[0 ] == "CREATE" && tokens[1 ] == "INDEX" ) {
create_index(tokens, error_pos);
}
else if (tokens[0 ] == "BEGIN" ) {
tables[tokens[1 ]].begin_transaction();
}
else if (tokens[0 ] == "COMMIT" ) {
tables[tokens[1 ]].commit();
}
else if (tokens[0 ] == "ROLLBACK" ) {
tables[tokens[1 ]].rollback();
}
else if (tokens[0 ] == "SHOW" && tokens[1 ] == "STATS" ) {
std ::cout << tables[tokens[2 ]].stats() << "\n" ;
}
else {
throw std ::runtime_error("未知のSQLコマンド: " + tokens[0 ] + " at " + std ::to_string(error_pos));
}
}
catch (const std ::exception& e) {
std ::cerr << "エラー: " << e.what() << "\n" ;
}
}
void run_cli () {
std ::string input;
std ::cout << "SQL> " ;
while (std ::getline(std ::cin , input)) {
if (input == "EXIT" ) break ;
execute(input);
std ::cout << "SQL> " ;
}
}
};
copy
コメント