Files
Sipro-Marketplaces/internal/redis/client.go

133 lines
3.1 KiB
Go

package redis
import (
"bytes"
"compress/flate"
"compress/zlib"
"context"
"fmt"
"io"
"os"
"time"
"github.com/golang/protobuf/proto"
"github.com/redis/rueidis"
)
var Client *rueidis.Client
func InitClient(ctx context.Context) error {
var err error
host := os.Getenv("REDIS_HOST")
//host := "redis"
port := os.Getenv("REDIS_PORT")
password := os.Getenv("REDIS_PASSWORD")
client, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: []string{host + ":" + port},
Password: password,
})
if err != nil {
return err
}
err = client.Do(ctx, client.B().Ping().Build()).Error()
if err != nil {
return err
}
Client = &client
return nil
}
func CloseClient() {
if Client != nil {
(*Client).Close()
}
}
// WriteProtoMessage compresses and writes a protobuf message to Redis
func WriteProtoMessage(ctx context.Context, key string, message proto.Message, ttl ...time.Duration) error {
if Client == nil {
return fmt.Errorf("redis client not initialized")
}
if message == nil {
return fmt.Errorf("message is nil")
}
// Marshal protobuf message
bytesMessage, err := proto.Marshal(message)
if err != nil {
return fmt.Errorf("failed to marshal proto message: %w", err)
}
// Compress with zlib
var buf bytes.Buffer
w, err := zlib.NewWriterLevel(&buf, flate.BestCompression)
if err != nil {
return fmt.Errorf("failed to create zlib writer: %w", err)
}
defer w.Close() // гарантированное закрытие
if _, err := w.Write(bytesMessage); err != nil {
return fmt.Errorf("failed to write to zlib writer: %w", err)
}
if err := w.Close(); err != nil { // финализируем сжатие
return fmt.Errorf("failed to close zlib writer: %w", err)
}
var ttlDuration time.Duration
if len(ttl) > 0 {
ttlDuration = ttl[0]
} else {
ttlDuration = 3 * time.Hour // Default TTL of 24 hours
}
// Write to Redis
if err := (*Client).Do(ctx, (*Client).B().
Set().
Key(key).
Value(rueidis.BinaryString(buf.Bytes())).
Ex(ttlDuration).
Build()).
Error(); err != nil {
return fmt.Errorf("failed to write compressed data to Redis: %w", err)
}
return nil
}
// ReadProtoMessage reads and decompresses a protobuf message from Redis
func ReadProtoMessage(ctx context.Context, key string, message proto.Message) error {
if Client == nil {
return fmt.Errorf("redis client not initialized")
}
if message == nil {
return fmt.Errorf("message is nil")
}
// Get bytes from Redis
resp, err := (*Client).Do(ctx, (*Client).B().Get().Key(key).Build()).AsBytes()
if err != nil {
return fmt.Errorf("failed to read data from Redis: %w", err)
}
if resp == nil {
return fmt.Errorf("no data found for key: %s", key)
}
// Decompress
reader, err := zlib.NewReader(bytes.NewReader(resp))
if err != nil {
return fmt.Errorf("failed to create zlib reader: %w", err)
}
defer reader.Close()
decompressed, err := io.ReadAll(reader)
if err != nil {
return fmt.Errorf("failed to decompress data: %w", err)
}
// Unmarshal protobuf
if err := proto.Unmarshal(decompressed, message); err != nil {
return fmt.Errorf("failed to unmarshal proto message: %w", err)
}
return nil
}