Refactor marketplace product fetching and caching logic; update environment configuration for Redis and PostgreSQL
This commit is contained in:
66
internal/tasks/ozon/ozon.go
Normal file
66
internal/tasks/ozon/ozon.go
Normal file
@@ -0,0 +1,66 @@
|
||||
package ozon
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"github.com/samber/lo"
|
||||
"sipro-mps/internal/marketplace"
|
||||
"sipro-mps/internal/ozon/products/mapping/generated"
|
||||
"sipro-mps/internal/redis"
|
||||
"sipro-mps/internal/tasks/types"
|
||||
|
||||
"sipro-mps/internal/ozon/products"
|
||||
)
|
||||
|
||||
type FetchProductsProcessor struct {
|
||||
Dbpool *pgxpool.Pool
|
||||
}
|
||||
|
||||
func (p *FetchProductsProcessor) ProcessTask(ctx context.Context, task *asynq.Task) error {
|
||||
var payload types.FetchProductsTask
|
||||
err := payload.Unmarshal(task)
|
||||
if err != nil {
|
||||
return asynq.SkipRetry
|
||||
}
|
||||
marketplaceRepo := marketplace.NewDBRepository(p.Dbpool)
|
||||
marketplaceById, err := marketplaceRepo.GetMarketplaceByID(ctx, payload.MarketplaceId)
|
||||
if err != nil {
|
||||
return asynq.SkipRetry
|
||||
}
|
||||
identifier, err := marketplaceById.GetIdentifier()
|
||||
if err != nil {
|
||||
return asynq.SkipRetry
|
||||
}
|
||||
lockKey := fmt.Sprintf("ozon:products:marketplace:%s:lock", identifier)
|
||||
locker := *redis.Locker
|
||||
_, cancel, err := locker.TryWithContext(ctx, lockKey)
|
||||
if err != nil {
|
||||
fmt.Printf("Failed to acquire lock for marketplace %s: %v\n", identifier, err)
|
||||
return asynq.SkipRetry
|
||||
}
|
||||
defer cancel()
|
||||
ozonRepo := products.NewAPIRepository(marketplaceRepo)
|
||||
|
||||
productsRaw, err := ozonRepo.GetAllProducts(ctx, payload.MarketplaceId)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to fetch products for marketplace %d: %w", payload.MarketplaceId, err)
|
||||
}
|
||||
converter := generated.ConverterImpl{}
|
||||
productsProto := lo.Map(productsRaw, func(item products.OzonProduct, _ int) *products.PbProduct {
|
||||
return converter.ToProto(&item)
|
||||
})
|
||||
productsJson, err := json.Marshal(productsProto)
|
||||
if err != nil {
|
||||
return asynq.SkipRetry
|
||||
}
|
||||
redisClient := *redis.Client
|
||||
productsKey := fmt.Sprintf("ozon:products:%s", identifier)
|
||||
err = redisClient.Do(ctx, redisClient.B().Set().Key(productsKey).Value(string(productsJson)).Build()).Error()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"sipro-mps/internal/config"
|
||||
"sipro-mps/internal/tasks/ozon"
|
||||
"sipro-mps/internal/tasks/types"
|
||||
"sipro-mps/internal/tasks/wb"
|
||||
)
|
||||
@@ -24,13 +25,14 @@ func (s *AsynqServer) createMux() *asynq.ServeMux {
|
||||
|
||||
// Register task handlers here
|
||||
mux.Handle(types.TypeWbFetchProducts, &wb.FetchProductsProcessor{Dbpool: s.dbpool})
|
||||
mux.Handle(types.TypeOzonFetchProducts, &ozon.FetchProductsProcessor{Dbpool: s.dbpool})
|
||||
return mux
|
||||
}
|
||||
|
||||
func (s *AsynqServer) Run() {
|
||||
srv := asynq.NewServer(
|
||||
asynq.RedisClientOpt{Addr: s.redisConfig.Addr, Password: s.redisConfig.Password},
|
||||
asynq.Config{Concurrency: 10},
|
||||
asynq.Config{},
|
||||
)
|
||||
mux := s.createMux()
|
||||
if err := srv.Run(mux); err != nil {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package types
|
||||
|
||||
const (
|
||||
TypeWbFetchProducts = "wb:fetch_products"
|
||||
TypeWbFetchProducts = "wb:fetch_products"
|
||||
TypeOzonFetchProducts = "ozon:fetch_products"
|
||||
)
|
||||
|
||||
24
internal/tasks/types/types.go
Normal file
24
internal/tasks/types/types.go
Normal file
@@ -0,0 +1,24 @@
|
||||
package types
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/hibiken/asynq"
|
||||
"time"
|
||||
)
|
||||
|
||||
type FetchProductsTask struct {
|
||||
MarketplaceId int
|
||||
}
|
||||
|
||||
func (t *FetchProductsTask) Unmarshal(task *asynq.Task) error {
|
||||
return json.Unmarshal(task.Payload(), t)
|
||||
|
||||
}
|
||||
|
||||
func NewFetchProductsTask(taskType string, marketplaceId int) (*asynq.Task, error) {
|
||||
payload, err := json.Marshal(&FetchProductsTask{MarketplaceId: marketplaceId})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return asynq.NewTask(taskType, payload, asynq.MaxRetry(2), asynq.Timeout(20*time.Minute)), nil
|
||||
}
|
||||
@@ -1,19 +0,0 @@
|
||||
package types
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/hibiken/asynq"
|
||||
"time"
|
||||
)
|
||||
|
||||
type FetchProductsTask struct {
|
||||
MarketplaceId int
|
||||
}
|
||||
|
||||
func NewFetchProductsTask(marketplaceId int) (*asynq.Task, error) {
|
||||
payload, err := json.Marshal(&FetchProductsTask{MarketplaceId: marketplaceId})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return asynq.NewTask(TypeWbFetchProducts, payload, asynq.MaxRetry(2), asynq.Timeout(20*time.Minute)), nil
|
||||
}
|
||||
@@ -8,16 +8,15 @@ import (
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"github.com/samber/lo"
|
||||
pb "sipro-mps/api/generated/v1/wb/products"
|
||||
mp_repo "sipro-mps/internal/marketplace"
|
||||
"sipro-mps/internal/marketplace"
|
||||
"sipro-mps/internal/redis"
|
||||
"sipro-mps/internal/tasks/types"
|
||||
wb_products_repo "sipro-mps/internal/wb/products"
|
||||
conv "sipro-mps/internal/wb/products/mapping/generated"
|
||||
"sipro-mps/internal/wb/products"
|
||||
"sipro-mps/internal/wb/products/mapping/generated"
|
||||
)
|
||||
|
||||
type FetchProductsProcessor struct {
|
||||
Dbpool *pgxpool.Pool
|
||||
wbRepo wb_products_repo.Repository
|
||||
}
|
||||
|
||||
func (p *FetchProductsProcessor) ProcessTask(ctx context.Context, task *asynq.Task) error {
|
||||
@@ -25,11 +24,15 @@ func (p *FetchProductsProcessor) ProcessTask(ctx context.Context, task *asynq.Ta
|
||||
if err := json.Unmarshal(task.Payload(), &payload); err != nil {
|
||||
return asynq.SkipRetry
|
||||
}
|
||||
marketplaceRepo := mp_repo.NewDBRepository(p.Dbpool)
|
||||
repo := wb_products_repo.NewAPIRepository(marketplaceRepo)
|
||||
_, sellerId, err := repo.ParseMarketplace(ctx, payload.MarketplaceId)
|
||||
marketplaceRepo := marketplace.NewDBRepository(p.Dbpool)
|
||||
repo := products.NewAPIRepository(marketplaceRepo)
|
||||
marketplaceById, err := marketplaceRepo.GetMarketplaceByID(ctx, payload.MarketplaceId)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse marketplace %d: %w", payload.MarketplaceId, err)
|
||||
return fmt.Errorf("failed to get marketplace by ID %d: %w", payload.MarketplaceId, err)
|
||||
}
|
||||
sellerId, err := marketplaceById.GetIdentifier()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get identifier for marketplace %d: %w", payload.MarketplaceId, err)
|
||||
}
|
||||
locker := *redis.Locker
|
||||
_, cancel, err := locker.TryWithContext(ctx, fmt.Sprintf("wb:products:marketplace:%s:lock", sellerId))
|
||||
@@ -47,12 +50,12 @@ func (p *FetchProductsProcessor) ProcessTask(ctx context.Context, task *asynq.Ta
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to fetch products for marketplace %d: %w", payload.MarketplaceId, err)
|
||||
}
|
||||
converter := conv.ConverterImpl{}
|
||||
products := lo.Map(productsRaw, func(item wb_products_repo.WbProduct, _ int) *pb.Product {
|
||||
converter := generated.ConverterImpl{}
|
||||
productsProto := lo.Map(productsRaw, func(item products.WbProduct, _ int) *pb.Product {
|
||||
return converter.ToProto(&item)
|
||||
})
|
||||
redisClient := *redis.Client
|
||||
productsJson, err := json.Marshal(products)
|
||||
productsJson, err := json.Marshal(productsProto)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal products: %w", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user