133 lines
		
	
	
		
			3.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			133 lines
		
	
	
		
			3.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package redis
 | 
						|
 | 
						|
import (
 | 
						|
	"bytes"
 | 
						|
	"compress/flate"
 | 
						|
	"compress/zlib"
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
	"io"
 | 
						|
	"os"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/golang/protobuf/proto"
 | 
						|
	"github.com/redis/rueidis"
 | 
						|
)
 | 
						|
 | 
						|
var Client *rueidis.Client
 | 
						|
 | 
						|
func InitClient(ctx context.Context) error {
 | 
						|
	var err error
 | 
						|
	host := os.Getenv("REDIS_HOST")
 | 
						|
	//host := "redis"
 | 
						|
	port := os.Getenv("REDIS_PORT")
 | 
						|
	password := os.Getenv("REDIS_PASSWORD")
 | 
						|
 | 
						|
	client, err := rueidis.NewClient(rueidis.ClientOption{
 | 
						|
		InitAddress: []string{host + ":" + port},
 | 
						|
		Password:    password,
 | 
						|
	})
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	err = client.Do(ctx, client.B().Ping().Build()).Error()
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	Client = &client
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func CloseClient() {
 | 
						|
	if Client != nil {
 | 
						|
		(*Client).Close()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// WriteProtoMessage compresses and writes a protobuf message to Redis
 | 
						|
func WriteProtoMessage(ctx context.Context, key string, message proto.Message, ttl ...time.Duration) error {
 | 
						|
	if Client == nil {
 | 
						|
		return fmt.Errorf("redis client not initialized")
 | 
						|
	}
 | 
						|
	if message == nil {
 | 
						|
		return fmt.Errorf("message is nil")
 | 
						|
	}
 | 
						|
 | 
						|
	// Marshal protobuf message
 | 
						|
	bytesMessage, err := proto.Marshal(message)
 | 
						|
	if err != nil {
 | 
						|
		return fmt.Errorf("failed to marshal proto message: %w", err)
 | 
						|
	}
 | 
						|
 | 
						|
	// Compress with zlib
 | 
						|
	var buf bytes.Buffer
 | 
						|
	w, err := zlib.NewWriterLevel(&buf, flate.BestCompression)
 | 
						|
	if err != nil {
 | 
						|
		return fmt.Errorf("failed to create zlib writer: %w", err)
 | 
						|
	}
 | 
						|
	defer w.Close() // гарантированное закрытие
 | 
						|
 | 
						|
	if _, err := w.Write(bytesMessage); err != nil {
 | 
						|
		return fmt.Errorf("failed to write to zlib writer: %w", err)
 | 
						|
	}
 | 
						|
	if err := w.Close(); err != nil { // финализируем сжатие
 | 
						|
		return fmt.Errorf("failed to close zlib writer: %w", err)
 | 
						|
	}
 | 
						|
 | 
						|
	var ttlDuration time.Duration
 | 
						|
	if len(ttl) > 0 {
 | 
						|
		ttlDuration = ttl[0]
 | 
						|
	} else {
 | 
						|
		ttlDuration = 3 * time.Hour // Default TTL of 24 hours
 | 
						|
	}
 | 
						|
	// Write to Redis
 | 
						|
	if err := (*Client).Do(ctx, (*Client).B().
 | 
						|
		Set().
 | 
						|
		Key(key).
 | 
						|
		Value(rueidis.BinaryString(buf.Bytes())).
 | 
						|
		Ex(ttlDuration).
 | 
						|
		Build()).
 | 
						|
		Error(); err != nil {
 | 
						|
		return fmt.Errorf("failed to write compressed data to Redis: %w", err)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// ReadProtoMessage reads and decompresses a protobuf message from Redis
 | 
						|
func ReadProtoMessage(ctx context.Context, key string, message proto.Message) error {
 | 
						|
	if Client == nil {
 | 
						|
		return fmt.Errorf("redis client not initialized")
 | 
						|
	}
 | 
						|
	if message == nil {
 | 
						|
		return fmt.Errorf("message is nil")
 | 
						|
	}
 | 
						|
 | 
						|
	// Get bytes from Redis
 | 
						|
	resp, err := (*Client).Do(ctx, (*Client).B().Get().Key(key).Build()).AsBytes()
 | 
						|
	if err != nil {
 | 
						|
		return fmt.Errorf("failed to read data from Redis: %w", err)
 | 
						|
	}
 | 
						|
	if resp == nil {
 | 
						|
		return fmt.Errorf("no data found for key: %s", key)
 | 
						|
	}
 | 
						|
 | 
						|
	// Decompress
 | 
						|
	reader, err := zlib.NewReader(bytes.NewReader(resp))
 | 
						|
	if err != nil {
 | 
						|
		return fmt.Errorf("failed to create zlib reader: %w", err)
 | 
						|
	}
 | 
						|
	defer reader.Close()
 | 
						|
 | 
						|
	decompressed, err := io.ReadAll(reader)
 | 
						|
	if err != nil {
 | 
						|
		return fmt.Errorf("failed to decompress data: %w", err)
 | 
						|
	}
 | 
						|
 | 
						|
	// Unmarshal protobuf
 | 
						|
	if err := proto.Unmarshal(decompressed, message); err != nil {
 | 
						|
		return fmt.Errorf("failed to unmarshal proto message: %w", err)
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 |