You have 2 free member-only stories left this month.
COPY with Go and PostgreSQL
Batch data inserts and complex queries tend to be major performance issues in databases. However, this time, the latter is set aside, and the former data acquisition is dealt with. Recently, I think that speeding up data insertion is a little important. This is because executing batch processing on the cloud imposes various restrictions.
Cloud service and batch time limits
Lambda on AWS is 15 minutes (900 seconds). GCP’s Cloud Functions takes 9 minutes (540 seconds). If this time is enough, it is very easy to operate if you can connect it to the scheduler service and process it with Lambda on a regular basis. With a slightly stricter limit, AWS’s API Gateway has a 30-second limit. Within this time, you can easily upload data from the serverless management screen and do something like bulk insert.
It’s a bit long for other services, but for GCP’s Cloud Run for an hour, AWS’s App Runner hasn’t announced such a limit yet, but I think it’s about the same. In addition, if you use EC2 etc., there is no time limit, but if you execute it via ALB or something, it will be cut off in 90 minutes although it is not in the document.
If the time is tight, you can issue a signed URL in advance to save communication time, upload it to S3 and then process it, or put it in a queue line by line and fan out with Lambda. However, if there are few characters, debugging will be easier and troubleshooting will be easier. Anyway, there are restrictions everywhere, and if you speed up, you will be able to choose a simpler mechanism, it will be easier to operate, and it will be cheaper. Speeding up is justice.
COPY FROM?
I heard that PostgreSQL has COPY FROM, which reads files at high speed. I didn’t know it, so I looked it up.
COPY
Bet\COPY
there is.COPY
Can interact with the local file on the DB server (COPY FROM
readCOPY TO
from the local file to the table, write to the local file from the table)- It
COPY FROM/TO
seems that pg_dump uses internally. Are you transferring files locally using somethingCOPY FROM STDIN
likeCOPY TO STDOUT
that? \COPY
Is also available between client and server. 11 times faster than SQL with INSERTs. More than 3 times faster than processing INSERTs in a single transaction.
There are two types, but it seems that there is no need to think about how to use them properly.
Go and COPY
There are two types of Go PostgreSQL drivers.
It seems that pgx has better performance for lib / pq and pgx . The number of stars is larger in lib / pq, but pgx is also not small.
lib / pq also had a bulk import feature using Copy, and pgx also had COPY protocol support .
The implementation method is not a bit different, pgx is database/sql
the Conn
unique extension of the Conn
(has a type database/sql
have become upward-compatible with the interface), the Conn
in CopyFrom () method is growing. lib / pq was an implementation that leveraged the standard Prepare / Exec interface.
Some OR mappers completely wrap Conn and hide the back Conn (such as gorm?), So in that case you can use lib / pq or use it properly depending on the situation. Well, in the first place, if you put data in batch, you do not need to match the production code and architecture or use OR mapper, so use pgx directly … It seems to be good.
Try (preparation)
Python and Poetry are written on the assumption that they are already installed.
$ poetry new conv-toriki
$ cd conv-toriki
$ poetry add tabula-py
The script looks like this
import tabula
tabula.convert_into("toriki_allergie_21su.pdf", "output.csv", output_format="csv", pages=[2, 3, 4, 5])
I will do it. Since you can create a CSV file, you can manually remove the header lines (although it may be possible to automate it).
$ poetry run python convert.py
By the way, PostgreSQL is also installed with Docker and started.
$ docker pull postgres:13.3
$ docker run -d --rm --name db -e POSTGRES_USER=pg -e POSTGRES_PASSWORD=pw -e POSTGRES_DB=toriki -p 5432:5432 postgres:13.3
Start the psql command of this container to create a table.
$ docker exec -it db psql -U pg -d toriki
psql (13.3 (Debian 13.3-1.pgdg100+1))
Type "help" for help.
toriki = # create table
allergies ( toriki ( # id serial PRIMARY KEY,
toriki ( # category varchar (50) not null,
toriki ( # menu varchar (50) not null,
toriki ( # shrimp boolean,
toriki ( # crab boolean,
toriki) ( # wheat boolean,
toriki ( # soba boolean,
toriki ( # eggs boolean,
toriki ( # milk boolean,
toriki ( # peanuts boolean,
toriki ( # walnuts boolean
toriki ( #));
CREATE TABLE
Usage example with lib / pq
This is a sample that reads CSV and flows it with Copy. CopyIn()
The first argument is the table name, and the second and subsequent arguments are the column names. The pictograms are included as a mark to identify the error location ( because the sample becomes a little longer if log.SetFlag is used).
stmt.ExecContext()
I added the contents of each line stmt.Close()
one after another, and finally I was able to insert all the lines with one request. I'm not chasing the internal implementation, but if all the contents are on-memory, it is better to execute it in thousands of lines.
package main
import (
"context"
"database/sql"
"encoding/csv"
"io"
"log"
"os"
"os/signal"
_ "github.com/lib/pq"
)
func main() {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
defer stop()
f, err := os.Open("../../output.csv")
if err != nil {
log.Fatal("🐙", err)
}
r := csv.NewReader(f)
r.FieldsPerRecord = -1
connStr := "host=localhost port=5432 user=pg password=pw dbname=testdb sslmode=disable"
db, err := sql.Open("postgres", connStr)
if err != nil {
log.Fatal("🦑", err)
}
txn, err := db.Begin()
if err != nil {
log.Fatal("🐣", err)
}
stmt, err := txn.Prepare(pq.CopyIn("allergies",
"category", "menu",
"shrimp", "crab", "wheat", "soba", "eggs", "milk", "peanuts", "walnuts"))
if err != nil {
log.Fatal("🐵", err)
}
for {
record, err := r.Read()
log.Println(record, err)
if err == io.EOF {
break
} else if err != nil {
log.Fatal("🐍", err)
}
_, err = stmt.ExecContext(ctx,
record[0], record[1], record[2] != "", record[3] != "", record[4] != "", record[5] != "", record[6] != "", record[7] != "", record[8] != "", record[9] != "")
}
_, err = stmt.ExecContext(ctx)
if err != nil {
log.Fatal("🐸", err)
}
err = stmt.Close()
if err != nil {
log.Fatal("🐶", err)
}
err = txn.Commit()
if err != nil {
log.Fatal("🐱", err)
}
}
Usage example with pgx
For pgx, the pgx.CopyFromSource
interface needs to be prepared on the application side. There is also a convenient function that generates this interface from slices, etc., but I felt that it was not suitable for super-large-scale data input because it can only be used if all of it can be stored in memory in advance or the number of rows is known . So, this time I made my own interface that wraps csv.Reader. Internally, it seems that it is being streamed sequentially using the binary protocol, so I feel that it seems that everything can be processed without putting it in memory (additional verification required).
package main
import (
"context"
"encoding/csv"
"io"
"log"
"os"
"os/signal"
"github.com/jackc/pgx/v4"
)
type copyFromSource struct {
r *csv.Reader
nextRow []interface{}
err error
}
func (s *copyFromSource) Next() bool {
s.nextRow = nil
s.err = nil
record, err := s.r.Read()
if err == io.EOF {
return false
} else if err != nil {
s.err = err
return false
}
s.nextRow = []interface{}{
record[0], record[1],
record[2] != "", record[3] != "", record[4] != "", record[5] != "",
record[6] != "", record[7] != "", record[8] != "", record[9] != "",
}
return true
}
func (s copyFromSource) Values() ([]interface{}, error) {
if s.err != nil {
return nil, s.err
}
return s.nextRow, nil
}
func (s copyFromSource) Err() error {
return s.err
}
var _ pgx.CopyFromSource = ©FromSource{}
func main() {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
defer stop()
f, err := os.Open("../../output.csv")
if err != nil {
log.Fatal("🐙", err)
}
r := csv.NewReader(f)
r.FieldsPerRecord = -1
conn, err := pgx.Connect(context.Background(), "postgres://pg:pw@localhost:5432/toriki")
if err != nil {
log.Fatal("🦑", err)
}
txn, err := conn.Begin(ctx)
if err != nil {
log.Fatal("🐣", err)
}
_, err = txn.CopyFrom(ctx, pgx.Identifier{"allergies"}, []string{
"category", "menu",
"shrimp", "crab", "wheat", "soba", "eggs", "milk", "peanuts", "walnuts",
}, ©FromSource{r: r})
if err != nil {
log.Fatal("🐬", err)
}
err = txn.Commit(ctx)
if err != nil {
log.Fatal("🐱", err)
}
}
Summary
I don’t usually use RDB very much (mostly NoSQL), so I did some warm-up and looked it up, and wrote the code. It is a DB-specific function, but even if you switch to a DB, it is not a pain to return to INSERT, it is highly effective, and I think that it may be used for batch data input in bulk. Since you can use either lib / pq or pgx, there seems to be a benefit regardless of the type of library selected by the application.
Now you can easily search for foods that contain or do not contain specific allergens. Is it really mega gold barley …
; # Select Menu From Allergies Where Wheat = True
Menu
------------------------------
thigh noble grilled sauce
sauce chest noble grilled
Tsukune Salt
Tsukune
:
Garlic Chives Teppan-yaki
Spicy Konnyaku Tatsuta Fried
Mega Kinmugi (Beer Beverage)
(49 rows)