148 lines
3.3 KiB
Go
148 lines
3.3 KiB
Go
package pebble
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"fmt"
|
|
"io"
|
|
|
|
"github.com/cockroachdb/pebble"
|
|
"github.com/d1nch8g/mesh/database"
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
type files struct {
|
|
db *pebble.DB
|
|
}
|
|
|
|
const fileChunkSize = 4 << 20 // 4 MB
|
|
|
|
func (f *files) Put(r io.Reader) (uuid.UUID, error) {
|
|
id := uuid.New()
|
|
|
|
batch := f.db.NewBatch()
|
|
defer batch.Close()
|
|
|
|
var totalSize int64
|
|
chunk := make([]byte, fileChunkSize)
|
|
var chunkIndex int
|
|
hasData := false
|
|
|
|
for {
|
|
n, err := r.Read(chunk)
|
|
if n > 0 {
|
|
k := key(id.String(), prefixFiles, fmt.Sprintf("%d", chunkIndex))
|
|
buf := make([]byte, n)
|
|
copy(buf, chunk[:n])
|
|
if err := batch.Set(k, buf, pebble.NoSync); err != nil {
|
|
return uuid.Nil, fmt.Errorf("store chunk %d of %q: %w", chunkIndex, id, err)
|
|
}
|
|
totalSize += int64(n)
|
|
chunkIndex++
|
|
hasData = true
|
|
}
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return uuid.Nil, fmt.Errorf("read file %q: %w", id, err)
|
|
}
|
|
}
|
|
|
|
if !hasData {
|
|
k := key(id.String(), prefixFiles, "0")
|
|
if err := batch.Set(k, []byte{}, pebble.NoSync); err != nil {
|
|
return uuid.Nil, fmt.Errorf("store empty chunk for %q: %w", id, err)
|
|
}
|
|
}
|
|
|
|
metaKey := key(id.String(), prefixFiles, prefixMeta)
|
|
var sizeBuf [8]byte
|
|
binary.LittleEndian.PutUint64(sizeBuf[:], uint64(totalSize))
|
|
if err := batch.Set(metaKey, sizeBuf[:], pebble.NoSync); err != nil {
|
|
return uuid.Nil, fmt.Errorf("store meta for %q: %w", id, err)
|
|
}
|
|
|
|
if err := batch.Commit(pebble.NoSync); err != nil {
|
|
return uuid.Nil, fmt.Errorf("commit file %q: %w", id, err)
|
|
}
|
|
|
|
return id, nil
|
|
}
|
|
|
|
func (f *files) Get(id uuid.UUID, w io.Writer) error {
|
|
chunkIndex := 0
|
|
for {
|
|
k := key(id.String(), prefixFiles, fmt.Sprintf("%d", chunkIndex))
|
|
data, closer, err := f.db.Get(k)
|
|
if err != nil {
|
|
if err == pebble.ErrNotFound {
|
|
if chunkIndex == 0 {
|
|
return database.ErrNotFound
|
|
}
|
|
return nil // end of chunks
|
|
}
|
|
return fmt.Errorf("read file %q chunk %d: %w", id, chunkIndex, err)
|
|
}
|
|
|
|
if _, err := w.Write(data); err != nil {
|
|
closer.Close()
|
|
return fmt.Errorf("write file %q: %w", id, err)
|
|
}
|
|
closer.Close()
|
|
chunkIndex++
|
|
}
|
|
}
|
|
|
|
func (f *files) Delete(id uuid.UUID) error {
|
|
batch := f.db.NewBatch()
|
|
defer batch.Close()
|
|
|
|
chunkIndex := 0
|
|
foundAny := false
|
|
for {
|
|
k := key(id.String(), prefixFiles, fmt.Sprintf("%d", chunkIndex))
|
|
_, closer, err := f.db.Get(k)
|
|
if err != nil {
|
|
if err == pebble.ErrNotFound {
|
|
break
|
|
}
|
|
return fmt.Errorf("check file %q chunk %d: %w", id, chunkIndex, err)
|
|
}
|
|
closer.Close()
|
|
if err := batch.Delete(k, pebble.NoSync); err != nil {
|
|
return fmt.Errorf("delete chunk %d of %q: %w", chunkIndex, id, err)
|
|
}
|
|
foundAny = true
|
|
chunkIndex++
|
|
}
|
|
|
|
if !foundAny {
|
|
return database.ErrNotFound
|
|
}
|
|
|
|
metaKey := key(id.String(), prefixFiles, prefixMeta)
|
|
if err := batch.Delete(metaKey, pebble.NoSync); err != nil {
|
|
return fmt.Errorf("delete meta for %q: %w", id, err)
|
|
}
|
|
|
|
if err := batch.Commit(pebble.NoSync); err != nil {
|
|
return fmt.Errorf("commit delete %q: %w", id, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (f *files) Size(id uuid.UUID) (int64, error) {
|
|
metaKey := key(id.String(), prefixFiles, prefixMeta)
|
|
data, closer, err := f.db.Get(metaKey)
|
|
if err != nil {
|
|
if err == pebble.ErrNotFound {
|
|
return 0, database.ErrNotFound
|
|
}
|
|
return 0, fmt.Errorf("size file %q: %w", id, err)
|
|
}
|
|
defer closer.Close()
|
|
|
|
return int64(binary.LittleEndian.Uint64(data)), nil
|
|
}
|