package ozon import ( "context" "fmt" pb "sipro-mps/api/generated/v1/ozon/products" "sipro-mps/internal/marketplace" "sipro-mps/internal/ozon/products/mapping/generated" "sipro-mps/internal/redis" "sipro-mps/internal/tasks/types" "github.com/hibiken/asynq" "github.com/jackc/pgx/v5/pgxpool" "github.com/samber/lo" "sipro-mps/internal/ozon/products" ) type FetchProductsProcessor struct { Dbpool *pgxpool.Pool } func (p *FetchProductsProcessor) ProcessTask(ctx context.Context, task *asynq.Task) error { var payload types.FetchProductsTask err := payload.Unmarshal(task) if err != nil { return nil } marketplaceRepo := marketplace.NewDBRepository(p.Dbpool) marketplaceById, err := marketplaceRepo.GetMarketplaceByID(ctx, payload.MarketplaceId) if err != nil { return nil } identifier, err := marketplaceById.GetIdentifier() if err != nil { return nil } lockKey := fmt.Sprintf("ozon:products:marketplace:%s:lock", identifier) locker := *redis.Locker _, cancel, err := locker.TryWithContext(ctx, lockKey) if err != nil { fmt.Printf("Failed to acquire lock for marketplace %s: %v\n", identifier, err) return nil } defer cancel() ozonRepo := products.NewAPIRepository(marketplaceRepo) productsRaw, err := ozonRepo.GetAllProducts(ctx, payload.MarketplaceId) if err != nil { return fmt.Errorf("failed to fetch products for marketplace %d: %w", payload.MarketplaceId, err) } converter := generated.ConverterImpl{} productsProto := lo.Map(productsRaw, func(item products.OzonProduct, _ int) *products.PbProduct { return converter.ToProto(&item) }) redisKey := products.GetProductsKey(identifier) err = redis.WriteProtoMessage(ctx, redisKey, &pb.GetListOfProductsResponse{Products: productsProto}) if err != nil { fmt.Printf("Failed to write products to Redis for marketplace %s: %v\n", identifier, err) return asynq.RevokeTask } return nil }