From 3fd63d5f32bbf1fc53a74dee2b9f6af5d054f630 Mon Sep 17 00:00:00 2001 From: admin Date: Sun, 28 Sep 2025 20:19:45 +0300 Subject: [PATCH] temp --- cmd/server/main.go | 148 ++------------- go.mod | 2 + go.sum | 4 + internal/config/config.go | 5 +- internal/config/fx.go | 7 + internal/db/conn.go | 23 ++- internal/db/fx.go | 16 ++ internal/marketplace/adapter_grpc.go | 15 +- internal/marketplace/fx.go | 12 ++ internal/ozon/common.go | 10 +- internal/ozon/products/adapter_grpc.go | 13 +- internal/ozon/products/fx.go | 8 + internal/ozon/products/repository_api.go | 108 +++++------ internal/ozon/rate_limiter.go | 11 +- internal/redis/client.go | 222 +++++++++++------------ internal/redis/fx.go | 8 + internal/redis/lock.go | 23 ++- internal/redis/utils.go | 33 ++++ internal/transport/grpc/fx.go | 7 + internal/transport/grpc/server.go | 29 +++ internal/wb/rate_limiter.go | 7 +- 21 files changed, 356 insertions(+), 355 deletions(-) create mode 100644 internal/config/fx.go create mode 100644 internal/db/fx.go create mode 100644 internal/marketplace/fx.go create mode 100644 internal/ozon/products/fx.go create mode 100644 internal/redis/fx.go create mode 100644 internal/redis/utils.go create mode 100644 internal/transport/grpc/fx.go create mode 100644 internal/transport/grpc/server.go diff --git a/cmd/server/main.go b/cmd/server/main.go index dc14b5f..63694f8 100755 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -1,148 +1,24 @@ package main import ( - "context" - "fmt" - "net" - "os" "sipro-mps/internal/config" + "sipro-mps/internal/db" "sipro-mps/internal/marketplace" - ozon "sipro-mps/internal/ozon/products" + ozon_products "sipro-mps/internal/ozon/products" "sipro-mps/internal/redis" - "sipro-mps/internal/tasks/client" - wb "sipro-mps/internal/wb/products" - ym "sipro-mps/internal/ym/products" + "sipro-mps/internal/transport/grpc" - "github.com/jackc/pgx/v5/pgxpool" - "github.com/joho/godotenv" - "google.golang.org/grpc" + "go.uber.org/fx" _ "google.golang.org/grpc/encoding/gzip" ) -func logMessage(level string, format string, a ...interface{}) { - const ( - green = "\033[32m" - red = "\033[31m" - yellow = "\033[33m" - blue = "\033[34m" - reset = "\033[0m" - ) - switch level { - case "info": - fmt.Printf("%s✅ [INFO]%s %s %s\n", green, reset, fmt.Sprintf(format, a...), "ℹ️") - case "error": - fmt.Printf("%s❌ [ERROR]%s %s %s\n", red, reset, fmt.Sprintf(format, a...), "🚨") - default: - fmt.Printf("%s[LOG]%s %s\n", blue, reset, fmt.Sprintf(format, a...)) - } -} - -func createGrpcServer(pool *pgxpool.Pool) { - - lis, err := net.Listen("tcp", ":8080") - if err != nil { - fmt.Printf("failed to listen: %v\n", err) - return - } - grpcServer := grpc.NewServer() - - repo, err := marketplace.RegisterAdapterGRPC(grpcServer, pool) - if err != nil { - fmt.Printf("failed to register gRPC server: %v\n", err) - return - } - _, err = ozon.RegisterAdapterGRPC(grpcServer, *repo) - if err != nil { - fmt.Printf("failed to register Ozon Products gRPC server: %v\n", err) - return - } - _, err = wb.RegisterAdapterGRPC(grpcServer, *repo) - if err != nil { - fmt.Printf("failed to register Wildberries Products gRPC server: %v\n", err) - return - } - - _, err = ym.RegisterAdapterGRPC(grpcServer, *repo) - if err != nil { - fmt.Printf("failed to register Yandex Market Products gRPC server: %v\n", err) - return - - } - - fmt.Println("gRPC server registered successfully.") - // Start serving gRPC requests - fmt.Println("gRPC server is starting on port 8080...") - - if err := grpcServer.Serve(lis); err != nil { - fmt.Printf("failed to serve: %v\n", err) - } - fmt.Println("gRPC server created.") -} - -func initDotenv() error { - err := godotenv.Load() - if err != nil { - return fmt.Errorf("error loading .env file: %w", err) - } - logMessage("info", "Dotenv file loaded successfully. 🌱") - return nil -} - -func initRedisClient(ctx context.Context) error { - err := redis.InitClient(ctx) - if err != nil { - return fmt.Errorf("error initializing Redis client: %w", err) - } - //defer redis.CloseClient() - logMessage("info", "Redis client initialized successfully. 🟥") - return nil -} -func initRedisLocker() error { - err := redis.InitLocker() - if err != nil { - return fmt.Errorf("error initializing Redis locker: %w", err) - } - logMessage("info", "Redis locker initialized successfully. 🟥") - return nil -} - func main() { - err := initDotenv() - if err != nil { - logMessage("error", "Failed to load .env file: %v", err) - } - logMessage("info", "Starting the SIPRO Marketplace Server... 🚀1") - ctx := context.Background() - - // Initializing the Redis client - err = initRedisClient(ctx) - if err != nil { - logMessage("error", "Failed to initialize Redis client: %v", err) - return - } - defer redis.CloseClient() - - // Initializing the Redis locker - err = initRedisLocker() - if err != nil { - logMessage("error", "Failed to initialize Redis locker: %v", err) - return - } - defer redis.CloseLocker() - cfg, err := config.LoadConfig() - if err != nil { - logMessage("error", "Failed to load configuration: %v", err) - return - } - - client.InitClient(*cfg.Redis) - // Initializing pgx connection - dbpool, err := pgxpool.New(ctx, os.Getenv("POSTGRES_URL")) - if err != nil { - logMessage("error", "Failed to connect to PostgreSQL: %v", err) - return - } - defer dbpool.Close() - - createGrpcServer(dbpool) + fx.New( + config.Module, + redis.Module, + db.Module, + grpc.Module, + marketplace.Module, + ozon_products.Module, + ).Run() } diff --git a/go.mod b/go.mod index 4a62bc4..a938e41 100755 --- a/go.mod +++ b/go.mod @@ -77,6 +77,8 @@ require ( github.com/wasilibs/wazero-helpers v0.0.0-20240620070341-3dff1577cd52 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.uber.org/atomic v1.11.0 // indirect + go.uber.org/dig v1.19.0 // indirect + go.uber.org/fx v1.24.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect golang.org/x/crypto v0.41.0 // indirect diff --git a/go.sum b/go.sum index 1bf000c..66203ff 100755 --- a/go.sum +++ b/go.sum @@ -178,6 +178,10 @@ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +go.uber.org/dig v1.19.0 h1:BACLhebsYdpQ7IROQ1AGPjrXcP5dF80U3gKoFzbaq/4= +go.uber.org/dig v1.19.0/go.mod h1:Us0rSJiThwCv2GteUN0Q7OKvU7n5J4dxZ9JKUXozFdE= +go.uber.org/fx v1.24.0 h1:wE8mruvpg2kiiL1Vqd0CC+tr0/24XIB10Iwp2lLWzkg= +go.uber.org/fx v1.24.0/go.mod h1:AmDeGyS+ZARGKM4tlH4FY2Jr63VjbEDJHtqXTGP5hbo= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= diff --git a/internal/config/config.go b/internal/config/config.go index 451f688..9513c7e 100755 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -7,9 +7,10 @@ type Config struct { Database *DatabaseConfig } -func LoadConfig() (*Config, error) { +func LoadConfig() (Config, error) { + println("Config loaded") _ = godotenv.Load() redisConfig := LoadRedisConfig() databaseConfig := LoadDatabaseConfig() - return &Config{Redis: redisConfig, Database: databaseConfig}, nil + return Config{Redis: redisConfig, Database: databaseConfig}, nil } diff --git a/internal/config/fx.go b/internal/config/fx.go new file mode 100644 index 0000000..a22d3f0 --- /dev/null +++ b/internal/config/fx.go @@ -0,0 +1,7 @@ +package config + +import "go.uber.org/fx" + +var Module = fx.Options( + fx.Provide(LoadConfig), +) diff --git a/internal/db/conn.go b/internal/db/conn.go index 6dc2112..65e9a99 100755 --- a/internal/db/conn.go +++ b/internal/db/conn.go @@ -1,17 +1,26 @@ package db import ( - "database/sql" + "context" + "sipro-mps/internal/config" + + "github.com/jackc/pgx/v5/pgxpool" _ "github.com/lib/pq" + "go.uber.org/fx" ) -func NewConnection(dsn string) (*sql.DB, error) { - db, err := sql.Open("postgres", dsn) +func NewPgxPool(lc fx.Lifecycle, config config.Config) (*pgxpool.Pool, error) { + ctx := context.Background() + pool, err := pgxpool.New(ctx, config.Database.URL) if err != nil { return nil, err } - if err := db.Ping(); err != nil { - return nil, err - } - return db, nil + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + pool.Close() + return nil + }, + }) + + return pool, nil } diff --git a/internal/db/fx.go b/internal/db/fx.go new file mode 100644 index 0000000..4131bf9 --- /dev/null +++ b/internal/db/fx.go @@ -0,0 +1,16 @@ +package db + +import ( + "sipro-mps/internal/marketplace/db" + + "go.uber.org/fx" +) + +var Module = fx.Options( + fx.Provide( + fx.Annotate( + NewPgxPool, + fx.As(new(db.DBTX)), + ), + ), +) diff --git a/internal/marketplace/adapter_grpc.go b/internal/marketplace/adapter_grpc.go index e7afe59..7dccba3 100644 --- a/internal/marketplace/adapter_grpc.go +++ b/internal/marketplace/adapter_grpc.go @@ -2,11 +2,11 @@ package marketplace import ( "context" + pb "sipro-mps/api/generated/v1/marketplace" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - pb "sipro-mps/api/generated/v1/marketplace" - "sipro-mps/internal/marketplace/db" ) // AdapterGRPC implements the gRPC server for the Marketplace service. @@ -20,15 +20,8 @@ func NewAdapterGRPC(repo Repository) *AdapterGRPC { repo: repo, } } -func RegisterAdapterGRPC(server *grpc.Server, conn db.DBTX) (*Repository, error) { - //conn, err := pgx.Connect(context.Background(), "postgresql://postgres:GjitkeYf%5Beq@/sipro?host=/run/postgresql") - //if err != nil { - // return nil, err - //} - repo := NewDBRepository(conn) - adapter := NewAdapterGRPC(repo) - pb.RegisterMarketplaceServiceServer(server, adapter) - return &repo, nil +func Register(server *grpc.Server, repo Repository) { + pb.RegisterMarketplaceServiceServer(server, NewAdapterGRPC(repo)) } func (g *AdapterGRPC) GetMarketplaceById(ctx context.Context, r *pb.GetMarketplaceByIdRequest) (*pb.Marketplace, error) { diff --git a/internal/marketplace/fx.go b/internal/marketplace/fx.go new file mode 100644 index 0000000..a3817e7 --- /dev/null +++ b/internal/marketplace/fx.go @@ -0,0 +1,12 @@ +package marketplace + +import "go.uber.org/fx" + +var Module = fx.Options( + fx.Provide( + NewDBRepository, + ), + fx.Invoke( + Register, + ), +) diff --git a/internal/ozon/common.go b/internal/ozon/common.go index c11178c..0494f8b 100644 --- a/internal/ozon/common.go +++ b/internal/ozon/common.go @@ -2,13 +2,15 @@ package ozon import ( "errors" - "git.denco.store/fakz9/ozon-api-client/ozon" - "github.com/tidwall/gjson" "net/http" "sipro-mps/internal/marketplace" + + "git.denco.store/fakz9/ozon-api-client/ozon" + "github.com/redis/rueidis" + "github.com/tidwall/gjson" ) -func GetClientFromMarketplace(mp *marketplace.Marketplace) (*ozon.Client, error) { +func GetClientFromMarketplace(redis rueidis.Client, mp *marketplace.Marketplace) (*ozon.Client, error) { authDataParsed := gjson.Parse(mp.AuthData) clientIdResult := authDataParsed.Get("clientId") @@ -19,7 +21,7 @@ func GetClientFromMarketplace(mp *marketplace.Marketplace) (*ozon.Client, error) apiKey := apiKeyResult.String() clientId := clientIdResult.String() httpClient := &http.Client{ - Transport: NewRateLimitTransport(), + Transport: NewRateLimitTransport(redis), } opts := []ozon.ClientOption{ ozon.WithAPIKey(apiKey), diff --git a/internal/ozon/products/adapter_grpc.go b/internal/ozon/products/adapter_grpc.go index b93861b..f5954dc 100644 --- a/internal/ozon/products/adapter_grpc.go +++ b/internal/ozon/products/adapter_grpc.go @@ -3,10 +3,10 @@ package products import ( "context" "fmt" + pb "sipro-mps/api/generated/v1/ozon/products" + "github.com/samber/lo" "google.golang.org/grpc" - pb "sipro-mps/api/generated/v1/ozon/products" - "sipro-mps/internal/marketplace" ) type AdapterGRPC struct { @@ -20,13 +20,10 @@ func NewAdapterGRPC(repo Repository) *AdapterGRPC { } } -// RegisterAdapterGRPC registers the gRPC server for the Products service. -func RegisterAdapterGRPC(server *grpc.Server, marketplaceRepo marketplace.Repository) (repo *Repository, err error) { - apiRepo := NewAPIRepository(marketplaceRepo) - adapter := NewAdapterGRPC(apiRepo) - pb.RegisterProductsServiceServer(server, adapter) - return &apiRepo, nil +func Register(server *grpc.Server, repo Repository) { + pb.RegisterProductsServiceServer(server, NewAdapterGRPC(repo)) } + func (g *AdapterGRPC) GetListOfProducts(req *pb.GetListOfProductsRequest, stream pb.ProductsService_GetListOfProductsServer) error { ctx := stream.Context() fmt.Printf("GetListOfProducts called with req: %+v\n", req.MarketplaceId) diff --git a/internal/ozon/products/fx.go b/internal/ozon/products/fx.go new file mode 100644 index 0000000..f79395c --- /dev/null +++ b/internal/ozon/products/fx.go @@ -0,0 +1,8 @@ +package products + +import "go.uber.org/fx" + +var Module = fx.Options( + fx.Provide(NewAPIRepository), + fx.Invoke(Register), +) diff --git a/internal/ozon/products/repository_api.go b/internal/ozon/products/repository_api.go index cb2bb30..b24381f 100644 --- a/internal/ozon/products/repository_api.go +++ b/internal/ozon/products/repository_api.go @@ -7,26 +7,28 @@ import ( "sipro-mps/internal/marketplace" "sipro-mps/internal/ozon" "sipro-mps/internal/ozon/products/mapping/generated" - "sipro-mps/internal/redis" "sipro-mps/pkg/utils" "strconv" "sync" api "git.denco.store/fakz9/ozon-api-client/ozon" + "github.com/redis/rueidis" "github.com/samber/lo" ) type apiRepository struct { marketplaceRepository marketplace.Repository + redis rueidis.Client } func GetProductsKey(identifier string) string { return fmt.Sprintf("ozon:products:%s:compressed", identifier) } -func NewAPIRepository(marketplaceRepository marketplace.Repository) Repository { +func NewAPIRepository(marketplaceRepository marketplace.Repository, redis rueidis.Client) Repository { return &apiRepository{ marketplaceRepository: marketplaceRepository, + redis: redis, } } @@ -91,7 +93,7 @@ func (a *apiRepository) GetAllProducts(ctx context.Context, marketplaceId int) ( if err != nil { return nil, err } - clientFromMarketplace, err := ozon.GetClientFromMarketplace(mp) + clientFromMarketplace, err := ozon.GetClientFromMarketplace(a.redis, mp) if err != nil { return nil, err } @@ -115,7 +117,7 @@ func (a *apiRepository) StreamAllProducts(ctx context.Context, marketplaceId int errChan <- err return } - clientFromMarketplace, err := ozon.GetClientFromMarketplace(mp) + clientFromMarketplace, err := ozon.GetClientFromMarketplace(a.redis, mp) if err != nil { errChan <- err return @@ -128,31 +130,31 @@ func (a *apiRepository) StreamAllProducts(ctx context.Context, marketplaceId int func (a *apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId int, resultChan chan<- []PbProduct, errChan chan<- error) { defer close(resultChan) defer close(errChan) - mp, err := a.marketplaceRepository.GetMarketplaceByID(ctx, marketplaceId) + _, err := a.marketplaceRepository.GetMarketplaceByID(ctx, marketplaceId) if err != nil { errChan <- err return } - identifier, err := mp.GetIdentifier() + //identifier, err := mp.GetIdentifier() if err != nil { errChan <- fmt.Errorf("getting marketplace identifier: %w", err) return } - key := GetProductsKey(identifier) + //key := GetProductsKey(identifier) var cachedMessage pb.GetListOfProductsResponse - err = redis.ReadProtoMessage(ctx, key, &cachedMessage) + //err = a.redis.ReadProtoMessage(ctx, key, &cachedMessage) if err == nil && len(cachedMessage.Products) > 0 { resultChan <- utils.DerefSlice(cachedMessage.Products) //_ = client.EnqueueFetchProductsTask(types.TypeOzonFetchProducts, marketplaceId) return } - locker := *redis.Locker - _, cancel, err := locker.TryWithContext(ctx, fmt.Sprintf("ozon:products:marketplace:%s:lock", key)) + //locker := *redis.Locker + //_, cancel, err := locker.TryWithContext(ctx, fmt.Sprintf("ozon:products:marketplace:%s:lock", key)) if err != nil { return } - defer cancel() + //defer cancel() innerResultChan := make(chan []OzonProduct) innerErrChan := make(chan error) @@ -164,8 +166,8 @@ func (a *apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceI if len(allProducts) == 0 { return } - message := pb.GetListOfProductsResponse{Products: utils.ToPtrs(allProducts)} - _ = redis.WriteProtoMessage(ctx, key, &message) + //message := pb.GetListOfProductsResponse{Products: utils.ToPtrs(allProducts)} + //_ = redis.WriteProtoMessage(ctx, key, &message) }() for { select { @@ -202,23 +204,23 @@ func (a *apiRepository) StreamProductAttributesCache(ctx context.Context, market errChan <- err return } - identifier, err := mp.GetIdentifier() - if err != nil { - errChan <- fmt.Errorf("getting marketplace identifier: %w", err) - return - } - ozonClient, err := ozon.GetClientFromMarketplace(mp) + //identifier, err := mp.GetIdentifier() + //if err != nil { + // errChan <- fmt.Errorf("getting marketplace identifier: %w", err) + // return + //} + ozonClient, err := ozon.GetClientFromMarketplace(a.redis, mp) if err != nil { errChan <- err return } - key := fmt.Sprintf("ozon:product_attributes:%s:lock", identifier) - locker := *redis.Locker - _, cancel, err := locker.WithContext(ctx, key) - if err != nil { - return - } - defer cancel() + //key := fmt.Sprintf("ozon:product_attributes:%s:lock", identifier) + //locker := *redis.Locker + //_, cancel, err := locker.WithContext(ctx, key) + //if err != nil { + // return + //} + //defer cancel() converter := generated.ConverterImpl{} @@ -253,24 +255,24 @@ func (a *apiRepository) DeleteProducts(ctx context.Context, marketplaceId int, i return nil, err } - identifier, err := mp.GetIdentifier() - if err != nil { - return nil, fmt.Errorf("getting marketplace identifier: %w", err) - } + //identifier, err := mp.GetIdentifier() + //if err != nil { + // return nil, fmt.Errorf("getting marketplace identifier: %w", err) + //} - ozonClient, err := ozon.GetClientFromMarketplace(mp) + ozonClient, err := ozon.GetClientFromMarketplace(a.redis, mp) if err != nil { return nil, err } - key := fmt.Sprintf("ozon:products_delete:%s:lock", identifier) - locker := *redis.Locker - _, cancel, err := locker.WithContext(ctx, key) - if err != nil { - return nil, err - - } - defer cancel() + //key := fmt.Sprintf("ozon:products_delete:%s:lock", identifier) + //locker := *redis.Locker + //_, cancel, err := locker.WithContext(ctx, key) + //if err != nil { + // return nil, err + // + //} + //defer cancel() // Step 1: map the items into a slice mapped := lo.Map(items, func(item *PbDeleteProductRequestItem, _ int) *PbDeleteProductResponseItem { @@ -322,24 +324,24 @@ func (a *apiRepository) CreateOrUpdateProducts(ctx context.Context, marketplaceI return nil, err } - ozonClient, err := ozon.GetClientFromMarketplace(mp) + ozonClient, err := ozon.GetClientFromMarketplace(a.redis, mp) if err != nil { return nil, err } - identifier, err := mp.GetIdentifier() - if err != nil { - return nil, fmt.Errorf("getting marketplace identifier: %w", err) + //identifier, err := mp.GetIdentifier() + //if err != nil { + // return nil, fmt.Errorf("getting marketplace identifier: %w", err) + // + //} - } - - key := fmt.Sprintf("ozon:products_create_update:%s:lock", identifier) - locker := *redis.Locker - _, cancel, err := locker.WithContext(ctx, key) - if err != nil { - return nil, err - - } - defer cancel() + //key := fmt.Sprintf("ozon:products_create_update:%s:lock", identifier) + //locker := *redis.Locker + //_, cancel, err := locker.WithContext(ctx, key) + //if err != nil { + // return nil, err + // + //} + //defer cancel() converter := generated.ConverterImpl{} pageSize := 100 diff --git a/internal/ozon/rate_limiter.go b/internal/ozon/rate_limiter.go index 264339d..998699d 100644 --- a/internal/ozon/rate_limiter.go +++ b/internal/ozon/rate_limiter.go @@ -2,10 +2,10 @@ package ozon import ( "fmt" - "github.com/redis/rueidis" "net/http" - "sipro-mps/internal/redis" "time" + + "github.com/redis/rueidis" ) const ( @@ -40,6 +40,7 @@ var ( type RateLimitTransport struct { http.RoundTripper + redis rueidis.Client } func (t *RateLimitTransport) RoundTrip(req *http.Request) (*http.Response, error) { @@ -47,7 +48,7 @@ func (t *RateLimitTransport) RoundTrip(req *http.Request) (*http.Response, error clientId := req.Header.Get("Client-Id") now := time.Now().UnixNano() - waitTime, err := rateLimiterScript.Exec(ctx, *redis.Client, []string{clientId}, []string{ + waitTime, err := rateLimiterScript.Exec(ctx, t.redis, []string{clientId}, []string{ fmt.Sprintf("%d", now), fmt.Sprintf("%d", int64(windowSize)), fmt.Sprintf("%d", rps), @@ -61,7 +62,7 @@ func (t *RateLimitTransport) RoundTrip(req *http.Request) (*http.Response, error } return t.RoundTripper.RoundTrip(req) } -func NewRateLimitTransport() *RateLimitTransport { +func NewRateLimitTransport(redis rueidis.Client) *RateLimitTransport { - return &RateLimitTransport{RoundTripper: http.DefaultTransport} + return &RateLimitTransport{RoundTripper: http.DefaultTransport, redis: redis} } diff --git a/internal/redis/client.go b/internal/redis/client.go index 6b58a9d..40cdea6 100644 --- a/internal/redis/client.go +++ b/internal/redis/client.go @@ -1,132 +1,126 @@ package redis import ( - "bytes" - "compress/flate" - "compress/zlib" "context" - "fmt" - "io" - "os" - "time" + "sipro-mps/internal/config" - "github.com/golang/protobuf/proto" "github.com/redis/rueidis" + "go.uber.org/fx" ) -var Client *rueidis.Client - -func InitClient(ctx context.Context) error { +func NewRedisClient(lc fx.Lifecycle, config config.Config) (rueidis.Client, error) { var err error - host := os.Getenv("REDIS_HOST") - //host := "redis" - port := os.Getenv("REDIS_PORT") - password := os.Getenv("REDIS_PASSWORD") + host := config.Redis.Host + port := config.Redis.Port + password := config.Redis.Password client, err := rueidis.NewClient(rueidis.ClientOption{ InitAddress: []string{host + ":" + port}, Password: password, }) if err != nil { - return err + return nil, err } - err = client.Do(ctx, client.B().Ping().Build()).Error() - if err != nil { - return err - } - Client = &client - return nil + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + return client.Do(ctx, client.B().Ping().Build()).Error() + }, + OnStop: func(ctx context.Context) error { + client.Close() + return nil + }, + }) + return client, nil } -func CloseClient() { - if Client != nil { - (*Client).Close() - } -} - -// WriteProtoMessage compresses and writes a protobuf message to Redis -func WriteProtoMessage(ctx context.Context, key string, message proto.Message, ttl ...time.Duration) error { - if Client == nil { - return fmt.Errorf("redis client not initialized") - } - if message == nil { - return fmt.Errorf("message is nil") - } - - // Marshal protobuf message - bytesMessage, err := proto.Marshal(message) - if err != nil { - return fmt.Errorf("failed to marshal proto message: %w", err) - } - - // Compress with zlib - var buf bytes.Buffer - w, err := zlib.NewWriterLevel(&buf, flate.BestCompression) - if err != nil { - return fmt.Errorf("failed to create zlib writer: %w", err) - } - defer w.Close() // гарантированное закрытие - - if _, err := w.Write(bytesMessage); err != nil { - return fmt.Errorf("failed to write to zlib writer: %w", err) - } - if err := w.Close(); err != nil { // финализируем сжатие - return fmt.Errorf("failed to close zlib writer: %w", err) - } - - var ttlDuration time.Duration - if len(ttl) > 0 { - ttlDuration = ttl[0] - } else { - ttlDuration = 3 * time.Hour // Default TTL of 24 hours - } - // Write to Redis - if err := (*Client).Do(ctx, (*Client).B(). - Set(). - Key(key). - Value(rueidis.BinaryString(buf.Bytes())). - Ex(ttlDuration). - Build()). - Error(); err != nil { - return fmt.Errorf("failed to write compressed data to Redis: %w", err) - } - return nil -} - -// ReadProtoMessage reads and decompresses a protobuf message from Redis -func ReadProtoMessage(ctx context.Context, key string, message proto.Message) error { - if Client == nil { - return fmt.Errorf("redis client not initialized") - } - if message == nil { - return fmt.Errorf("message is nil") - } - - // Get bytes from Redis - resp, err := (*Client).Do(ctx, (*Client).B().Get().Key(key).Build()).AsBytes() - if err != nil { - return fmt.Errorf("failed to read data from Redis: %w", err) - } - if resp == nil { - return fmt.Errorf("no data found for key: %s", key) - } - - // Decompress - reader, err := zlib.NewReader(bytes.NewReader(resp)) - if err != nil { - return fmt.Errorf("failed to create zlib reader: %w", err) - } - defer reader.Close() - - decompressed, err := io.ReadAll(reader) - if err != nil { - return fmt.Errorf("failed to decompress data: %w", err) - } - - // Unmarshal protobuf - if err := proto.Unmarshal(decompressed, message); err != nil { - return fmt.Errorf("failed to unmarshal proto message: %w", err) - } - - return nil -} +//func CloseClient() { +// if Client != nil { +// (*Client).Close() +// } +//} +// +//// WriteProtoMessage compresses and writes a protobuf message to Redis +//func WriteProtoMessage(ctx context.Context, key string, message proto.Message, ttl ...time.Duration) error { +// if Client == nil { +// return fmt.Errorf("redis client not initialized") +// } +// if message == nil { +// return fmt.Errorf("message is nil") +// } +// +// // Marshal protobuf message +// bytesMessage, err := proto.Marshal(message) +// if err != nil { +// return fmt.Errorf("failed to marshal proto message: %w", err) +// } +// +// // Compress with zlib +// var buf bytes.Buffer +// w, err := zlib.NewWriterLevel(&buf, flate.BestCompression) +// if err != nil { +// return fmt.Errorf("failed to create zlib writer: %w", err) +// } +// defer w.Close() // гарантированное закрытие +// +// if _, err := w.Write(bytesMessage); err != nil { +// return fmt.Errorf("failed to write to zlib writer: %w", err) +// } +// if err := w.Close(); err != nil { // финализируем сжатие +// return fmt.Errorf("failed to close zlib writer: %w", err) +// } +// +// //var ttlDuration time.Duration +// //if len(ttl) > 0 { +// // ttlDuration = ttl[0] +// //} else { +// // ttlDuration = 3 * time.Hour // Default TTL of 24 hours +// //} +// // Write to Redis +// if err := (*Client).Do(ctx, (*Client).B(). +// Set(). +// Key(key). +// Value(rueidis.BinaryString(buf.Bytes())). +// Build()). +// Error(); err != nil { +// return fmt.Errorf("failed to write compressed data to Redis: %w", err) +// } +// return nil +//} +// +//// ReadProtoMessage reads and decompresses a protobuf message from Redis +//func ReadProtoMessage(ctx context.Context, key string, message proto.Message) error { +// if Client == nil { +// return fmt.Errorf("redis client not initialized") +// } +// if message == nil { +// return fmt.Errorf("message is nil") +// } +// +// // Get bytes from Redis +// resp, err := (*Client).Do(ctx, (*Client).B().Get().Key(key).Build()).AsBytes() +// if err != nil { +// return fmt.Errorf("failed to read data from Redis: %w", err) +// } +// if resp == nil { +// return fmt.Errorf("no data found for key: %s", key) +// } +// +// // Decompress +// reader, err := zlib.NewReader(bytes.NewReader(resp)) +// if err != nil { +// return fmt.Errorf("failed to create zlib reader: %w", err) +// } +// defer reader.Close() +// +// decompressed, err := io.ReadAll(reader) +// if err != nil { +// return fmt.Errorf("failed to decompress data: %w", err) +// } +// +// // Unmarshal protobuf +// if err := proto.Unmarshal(decompressed, message); err != nil { +// return fmt.Errorf("failed to unmarshal proto message: %w", err) +// } +// +// return nil +//} diff --git a/internal/redis/fx.go b/internal/redis/fx.go new file mode 100644 index 0000000..b1d9f31 --- /dev/null +++ b/internal/redis/fx.go @@ -0,0 +1,8 @@ +package redis + +import "go.uber.org/fx" + +var Module = fx.Options( + fx.Provide(NewRedisClient), + fx.Provide(NewRedisLocker), +) diff --git a/internal/redis/lock.go b/internal/redis/lock.go index 58eb078..0d8a950 100644 --- a/internal/redis/lock.go +++ b/internal/redis/lock.go @@ -1,30 +1,29 @@ package redis import ( + "context" "os" "github.com/redis/rueidis" "github.com/redis/rueidis/rueidislock" + "go.uber.org/fx" ) -var Locker *rueidislock.Locker - -func InitLocker() error { +func NewRedisLocker(lc fx.Lifecycle) (rueidislock.Locker, error) { redisAddr := os.Getenv("REDIS_ADDR") password := os.Getenv("REDIS_PASSWORD") locker, err := rueidislock.NewLocker(rueidislock.LockerOption{ ClientOption: rueidis.ClientOption{InitAddress: []string{redisAddr}, Password: password}, }) if err != nil { - return err + return nil, err } - Locker = &locker - return nil -} -func CloseLocker() { - if Locker != nil { - (*Locker).Close() - } - Locker = nil + lc.Append(fx.Hook{ + OnStop: func(_ context.Context) error { + locker.Close() + return nil + }, + }) + return locker, nil } diff --git a/internal/redis/utils.go b/internal/redis/utils.go new file mode 100644 index 0000000..079079d --- /dev/null +++ b/internal/redis/utils.go @@ -0,0 +1,33 @@ +package redis + +// +//func WriteString(ctx context.Context, key string, value string, ttl ...time.Duration) error { +// if Client == nil { +// return rueidis.Nil +// } +// var expiration time.Duration +// if len(ttl) > 0 { +// expiration = ttl[0] +// +// } +// return (*Client).Do(ctx, (*Client).B(). +// Set(). +// Key(key). +// Value(value). +// Ex(expiration). +// Build()).Error() +//} +// +//func ReadString(ctx context.Context, key string) (string, error) { +// if Client == nil { +// return "", rueidis.Nil +// } +// resp := (*Client).Do(ctx, (*Client).B(). +// Get(). +// Key(key). +// Build()) +// if resp.Error() != nil { +// return "", resp.Error() +// } +// return resp.ToString() +//} diff --git a/internal/transport/grpc/fx.go b/internal/transport/grpc/fx.go new file mode 100644 index 0000000..a135354 --- /dev/null +++ b/internal/transport/grpc/fx.go @@ -0,0 +1,7 @@ +package grpc + +import "go.uber.org/fx" + +var Module = fx.Options( + fx.Provide(NewGrpcServer), +) diff --git a/internal/transport/grpc/server.go b/internal/transport/grpc/server.go new file mode 100644 index 0000000..c004e04 --- /dev/null +++ b/internal/transport/grpc/server.go @@ -0,0 +1,29 @@ +package grpc + +import ( + "context" + "net" + + "go.uber.org/fx" + "google.golang.org/grpc" +) + +func NewGrpcServer(lc fx.Lifecycle) *grpc.Server { + server := grpc.NewServer() + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + lis, err := net.Listen("tcp", ":8080") + if err != nil { + return err + } + println("Running server on :8080") + go server.Serve(lis) + return nil + }, + OnStop: func(ctx context.Context) error { + server.GracefulStop() + return nil + }, + }) + return server +} diff --git a/internal/wb/rate_limiter.go b/internal/wb/rate_limiter.go index 51f41c6..e7728cc 100644 --- a/internal/wb/rate_limiter.go +++ b/internal/wb/rate_limiter.go @@ -64,6 +64,7 @@ end type RateLimitTransport struct { http.RoundTripper + redis rueidis.Client } func (t *RateLimitTransport) RoundTrip(req *http.Request) (*http.Response, error) { @@ -84,7 +85,7 @@ func (t *RateLimitTransport) RoundTrip(req *http.Request) (*http.Response, error return nil, fmt.Errorf("sellerId is required in JWT claims") } now := time.Now().UnixMilli() - client := *redis.Client + client := t.redis waitTime, err := tokenBucketScript.Exec(ctx, client, []string{sellerId}, []string{ fmt.Sprintf("%d", now), @@ -113,7 +114,7 @@ func SyncRateLimitRemaining(ctx context.Context, sellerId string, remaining int) return fmt.Errorf("invalid sellerId or remaining") } now := time.Now().UnixMilli() - client := *redis.Client + client := cmds := []rueidis.Completed{ client.B().Set().Key(sellerId + ":capacity").Value(fmt.Sprintf("%d", defaultBucketCapacity)).Ex(time.Minute).Build(), @@ -167,6 +168,6 @@ func SetRateLimitRetry(ctx context.Context, sellerId string, retrySeconds int, l return nil } -func NewRateLimitTransport() *RateLimitTransport { +func NewRateLimitTransport(client rueidis.Client) *RateLimitTransport { return &RateLimitTransport{RoundTripper: http.DefaultTransport} }