Add Wildberries product fetching and rate limiting functionality
This commit is contained in:
@@ -1,17 +1,18 @@
|
||||
package config
|
||||
|
||||
import "github.com/joho/godotenv"
|
||||
|
||||
type Config struct {
|
||||
DB string
|
||||
HTTP string
|
||||
GRPC string
|
||||
Kafka string
|
||||
Redis *RedisConfig
|
||||
Database *DatabaseConfig
|
||||
}
|
||||
|
||||
func Load() Config {
|
||||
return Config{
|
||||
DB: "dbname=test password=GjitkYf[eq user=postgres sslmode=disable",
|
||||
HTTP: ":8080",
|
||||
GRPC: ":50051",
|
||||
Kafka: "localhost:9092",
|
||||
func LoadConfig() (*Config, error) {
|
||||
err := godotenv.Load()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
redisConfig := LoadRedisConfig()
|
||||
databaseConfig := LoadDatabaseConfig()
|
||||
return &Config{Redis: redisConfig, Database: databaseConfig}, nil
|
||||
}
|
||||
|
||||
23
internal/config/database.go
Normal file
23
internal/config/database.go
Normal file
@@ -0,0 +1,23 @@
|
||||
package config
|
||||
|
||||
import "os"
|
||||
|
||||
type DatabaseConfig struct {
|
||||
Host string
|
||||
Port string
|
||||
Login string
|
||||
Password string
|
||||
Database string
|
||||
URL string
|
||||
}
|
||||
|
||||
func LoadDatabaseConfig() *DatabaseConfig {
|
||||
return &DatabaseConfig{
|
||||
Host: os.Getenv("POSTGRES_HOST"),
|
||||
Port: os.Getenv("POSTGRES_PORT"),
|
||||
Login: os.Getenv("POSTGRES_LOGIN"),
|
||||
Password: os.Getenv("POSTGRES_PASSWORD"),
|
||||
Database: os.Getenv("POSTGRES_DATABASE"),
|
||||
URL: os.Getenv("POSTGRES_URL"),
|
||||
}
|
||||
}
|
||||
23
internal/config/redis.go
Normal file
23
internal/config/redis.go
Normal file
@@ -0,0 +1,23 @@
|
||||
package config
|
||||
|
||||
import "os"
|
||||
|
||||
type RedisConfig struct {
|
||||
Host string
|
||||
Port string
|
||||
Password string
|
||||
Addr string
|
||||
}
|
||||
|
||||
func LoadRedisConfig() *RedisConfig {
|
||||
host := os.Getenv("REDIS_HOST")
|
||||
port := os.Getenv("REDIS_PORT")
|
||||
password := os.Getenv("REDIS_PASSWORD")
|
||||
addr := os.Getenv("REDIS_ADDR")
|
||||
return &RedisConfig{
|
||||
Host: host,
|
||||
Port: port,
|
||||
Password: password,
|
||||
Addr: addr,
|
||||
}
|
||||
}
|
||||
@@ -14,4 +14,5 @@ type Marketplace struct {
|
||||
Name string
|
||||
AuthData pgtype.Text
|
||||
WarehouseID pgtype.Text
|
||||
AuthDataJson []byte
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
)
|
||||
|
||||
const getMarketplaceByID = `-- name: GetMarketplaceByID :one
|
||||
SELECT id, base_marketplace, name, auth_data, warehouse_id FROM marketplaces
|
||||
SELECT id, base_marketplace, name, auth_data, warehouse_id, auth_data_json FROM marketplaces
|
||||
WHERE id = $1 LIMIT 1
|
||||
`
|
||||
|
||||
@@ -23,6 +23,7 @@ func (q *Queries) GetMarketplaceByID(ctx context.Context, id int32) (Marketplace
|
||||
&i.Name,
|
||||
&i.AuthData,
|
||||
&i.WarehouseID,
|
||||
&i.AuthDataJson,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
@@ -1,9 +1,14 @@
|
||||
create table marketplaces
|
||||
(
|
||||
id serial
|
||||
id serial
|
||||
primary key,
|
||||
base_marketplace integer not null,
|
||||
name varchar not null,
|
||||
auth_data varchar,
|
||||
warehouse_id varchar
|
||||
base_marketplace integer not null,
|
||||
name varchar not null,
|
||||
auth_data varchar,
|
||||
warehouse_id varchar,
|
||||
auth_data_json jsonb generated always as (
|
||||
CASE
|
||||
WHEN ((auth_data)::text IS JSON) THEN (auth_data)::jsonb
|
||||
ELSE NULL::jsonb
|
||||
END) stored
|
||||
);
|
||||
@@ -5,4 +5,5 @@ type Marketplace struct {
|
||||
BaseMarketplace int `json:"base_marketplace"`
|
||||
AuthData string `json:"auth_data"`
|
||||
WarehouseID string `json:"warehouse_id"`
|
||||
AuthDataJson []byte `json:"auth_data_json,omitempty"`
|
||||
}
|
||||
|
||||
@@ -24,5 +24,6 @@ func (r *dbRepository) GetMarketplaceByID(ctx context.Context, id int) (*Marketp
|
||||
BaseMarketplace: int(marketplace.BaseMarketplace),
|
||||
AuthData: marketplace.AuthData.String,
|
||||
WarehouseID: marketplace.WarehouseID.String,
|
||||
AuthDataJson: marketplace.AuthDataJson,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
proto "sipro-mps/api/generated/v1/ozon/products"
|
||||
)
|
||||
|
||||
//go:generate go run github.com/jmattheis/goverter/cmd/goverter gen -g 'ignoreUnexported yes' .
|
||||
//go:generate go run github.com/jmattheis/goverter/cmd/goverter gen -global "ignoreUnexported yes" .
|
||||
|
||||
// goverter:converter
|
||||
// goverter:extend Int632ToInt64
|
||||
|
||||
@@ -5,9 +5,8 @@ package generated
|
||||
|
||||
import (
|
||||
ozon "git.denco.store/fakz9/ozon-api-client/ozon"
|
||||
|
||||
products "sipro-mps/api/generated/v1/ozon/products"
|
||||
ozon "git.denco.store/fakz9/ozon-api-client/ozon"
|
||||
mapping "sipro-mps/internal/ozon/products/mapping"
|
||||
)
|
||||
|
||||
type ConverterImpl struct{}
|
||||
|
||||
@@ -85,7 +85,7 @@ func (a *apiRepository) GetAllProducts(ctx context.Context, marketplaceId int) (
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items := []OzonProduct{}
|
||||
var items []OzonProduct
|
||||
productIdsChan := make(chan []int64)
|
||||
producsChan := make(chan []OzonProduct)
|
||||
errChan := make(chan error)
|
||||
|
||||
@@ -50,7 +50,7 @@ func (t *RateLimitTransport) RoundTrip(req *http.Request) (*http.Response, error
|
||||
waitTime, err := rateLimiterScript.Exec(ctx, *redis.Client, []string{clientId}, []string{
|
||||
fmt.Sprintf("%d", now),
|
||||
fmt.Sprintf("%d", int64(windowSize)),
|
||||
fmt.Sprintf("%d", 50),
|
||||
fmt.Sprintf("%d", rps),
|
||||
}).ToInt64()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to execute rate limit script: %w", err)
|
||||
|
||||
@@ -11,6 +11,7 @@ var Client *rueidis.Client
|
||||
func InitClient(ctx context.Context) error {
|
||||
var err error
|
||||
host := os.Getenv("REDIS_HOST")
|
||||
//host := "redis"
|
||||
port := os.Getenv("REDIS_PORT")
|
||||
password := os.Getenv("REDIS_PASSWORD")
|
||||
|
||||
|
||||
29
internal/redis/lock.go
Normal file
29
internal/redis/lock.go
Normal file
@@ -0,0 +1,29 @@
|
||||
package redis
|
||||
|
||||
import (
|
||||
"github.com/redis/rueidis"
|
||||
"github.com/redis/rueidis/rueidislock"
|
||||
"os"
|
||||
)
|
||||
|
||||
var Locker *rueidislock.Locker
|
||||
|
||||
func InitLocker() error {
|
||||
redisAddr := os.Getenv("REDIS_ADDR")
|
||||
password := os.Getenv("REDIS_PASSWORD")
|
||||
locker, err := rueidislock.NewLocker(rueidislock.LockerOption{
|
||||
ClientOption: rueidis.ClientOption{InitAddress: []string{redisAddr}, Password: password},
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
Locker = &locker
|
||||
return nil
|
||||
}
|
||||
func CloseLocker() {
|
||||
if Locker != nil {
|
||||
(*Locker).Close()
|
||||
}
|
||||
Locker = nil
|
||||
|
||||
}
|
||||
25
internal/tasks/client/client.go
Normal file
25
internal/tasks/client/client.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"github.com/hibiken/asynq"
|
||||
"sipro-mps/internal/config"
|
||||
)
|
||||
|
||||
var Client *asynq.Client
|
||||
|
||||
// InitClient initializes the Asynq client with the provided Redis configuration.
|
||||
func InitClient(redisConfig config.RedisConfig) {
|
||||
client := asynq.NewClient(asynq.RedisClientOpt{
|
||||
Addr: redisConfig.Addr,
|
||||
Password: redisConfig.Password,
|
||||
})
|
||||
Client = client
|
||||
}
|
||||
|
||||
func CloseClient() {
|
||||
if Client != nil {
|
||||
if err := Client.Close(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
39
internal/tasks/server/server.go
Normal file
39
internal/tasks/server/server.go
Normal file
@@ -0,0 +1,39 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"sipro-mps/internal/config"
|
||||
"sipro-mps/internal/tasks/types"
|
||||
"sipro-mps/internal/tasks/wb"
|
||||
)
|
||||
|
||||
type AsynqServer struct {
|
||||
redisConfig *config.RedisConfig
|
||||
dbpool *pgxpool.Pool
|
||||
}
|
||||
|
||||
func NewAsynqServer(redisConfig *config.RedisConfig, dbpool *pgxpool.Pool) *AsynqServer {
|
||||
return &AsynqServer{
|
||||
redisConfig: redisConfig,
|
||||
dbpool: dbpool,
|
||||
}
|
||||
}
|
||||
func (s *AsynqServer) createMux() *asynq.ServeMux {
|
||||
mux := asynq.NewServeMux()
|
||||
|
||||
// Register task handlers here
|
||||
mux.Handle(types.TypeWbFetchProducts, &wb.FetchProductsProcessor{Dbpool: s.dbpool})
|
||||
return mux
|
||||
}
|
||||
|
||||
func (s *AsynqServer) Run() {
|
||||
srv := asynq.NewServer(
|
||||
asynq.RedisClientOpt{Addr: s.redisConfig.Addr, Password: s.redisConfig.Password},
|
||||
asynq.Config{Concurrency: 10},
|
||||
)
|
||||
mux := s.createMux()
|
||||
if err := srv.Run(mux); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
5
internal/tasks/types/common.go
Normal file
5
internal/tasks/types/common.go
Normal file
@@ -0,0 +1,5 @@
|
||||
package types
|
||||
|
||||
const (
|
||||
TypeWbFetchProducts = "wb:fetch_products"
|
||||
)
|
||||
19
internal/tasks/types/wb.go
Normal file
19
internal/tasks/types/wb.go
Normal file
@@ -0,0 +1,19 @@
|
||||
package types
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/hibiken/asynq"
|
||||
"time"
|
||||
)
|
||||
|
||||
type FetchProductsTask struct {
|
||||
MarketplaceId int
|
||||
}
|
||||
|
||||
func NewFetchProductsTask(marketplaceId int) (*asynq.Task, error) {
|
||||
payload, err := json.Marshal(&FetchProductsTask{MarketplaceId: marketplaceId})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return asynq.NewTask(TypeWbFetchProducts, payload, asynq.MaxRetry(2), asynq.Timeout(20*time.Minute)), nil
|
||||
}
|
||||
65
internal/tasks/wb/wb.go
Normal file
65
internal/tasks/wb/wb.go
Normal file
@@ -0,0 +1,65 @@
|
||||
package wb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"github.com/samber/lo"
|
||||
pb "sipro-mps/api/generated/v1/wb/products"
|
||||
mp_repo "sipro-mps/internal/marketplace"
|
||||
"sipro-mps/internal/redis"
|
||||
"sipro-mps/internal/tasks/types"
|
||||
wb_products_repo "sipro-mps/internal/wb/products"
|
||||
conv "sipro-mps/internal/wb/products/mapping/generated"
|
||||
)
|
||||
|
||||
type FetchProductsProcessor struct {
|
||||
Dbpool *pgxpool.Pool
|
||||
wbRepo wb_products_repo.Repository
|
||||
}
|
||||
|
||||
func (p *FetchProductsProcessor) ProcessTask(ctx context.Context, task *asynq.Task) error {
|
||||
var payload types.FetchProductsTask
|
||||
if err := json.Unmarshal(task.Payload(), &payload); err != nil {
|
||||
return asynq.SkipRetry
|
||||
}
|
||||
marketplaceRepo := mp_repo.NewDBRepository(p.Dbpool)
|
||||
repo := wb_products_repo.NewAPIRepository(marketplaceRepo)
|
||||
_, sellerId, err := repo.ParseMarketplace(ctx, payload.MarketplaceId)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse marketplace %d: %w", payload.MarketplaceId, err)
|
||||
}
|
||||
locker := *redis.Locker
|
||||
_, cancel, err := locker.TryWithContext(ctx, fmt.Sprintf("wb:products:marketplace:%s:lock", sellerId))
|
||||
if err != nil {
|
||||
fmt.Printf("Failed to acquire lock for marketplace %s: %v\n", sellerId, err)
|
||||
return asynq.SkipRetry
|
||||
|
||||
}
|
||||
fmt.Println("Working on marketplace", payload.MarketplaceId, "with seller ID", sellerId)
|
||||
|
||||
defer cancel()
|
||||
|
||||
redisKey := fmt.Sprintf("wb:products:%s", sellerId)
|
||||
productsRaw, err := repo.GetAllProducts(ctx, payload.MarketplaceId)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to fetch products for marketplace %d: %w", payload.MarketplaceId, err)
|
||||
}
|
||||
converter := conv.ConverterImpl{}
|
||||
products := lo.Map(productsRaw, func(item wb_products_repo.WbProduct, _ int) *pb.Product {
|
||||
return converter.ToProto(&item)
|
||||
})
|
||||
redisClient := *redis.Client
|
||||
productsJson, err := json.Marshal(products)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal products: %w", err)
|
||||
}
|
||||
err = redisClient.Do(ctx, redisClient.B().Set().Key(redisKey).Value(string(productsJson)).Build()).Error()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
59
internal/wb/common.go
Normal file
59
internal/wb/common.go
Normal file
@@ -0,0 +1,59 @@
|
||||
package wb
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/golang-jwt/jwt/v5"
|
||||
"net/http"
|
||||
"sipro-mps/internal/marketplace"
|
||||
wbclient "sipro-mps/pkg/api/wb/client"
|
||||
"time"
|
||||
)
|
||||
|
||||
type WbAuthData struct {
|
||||
Token string `json:"token"`
|
||||
}
|
||||
|
||||
func NewWbAuthData(token string) WbAuthData {
|
||||
return WbAuthData{
|
||||
Token: token,
|
||||
}
|
||||
}
|
||||
|
||||
func DecodeWildberriesJwt(token []byte) (WbAuthData, jwt.MapClaims, error) {
|
||||
var authData WbAuthData
|
||||
err := json.Unmarshal(token, &authData)
|
||||
if err != nil {
|
||||
return authData, nil, fmt.Errorf("failed to unmarshal JWT: %w", err)
|
||||
}
|
||||
claims := jwt.MapClaims{}
|
||||
_, _, err = jwt.NewParser().ParseUnverified(authData.Token, claims)
|
||||
if err != nil {
|
||||
return authData, nil, fmt.Errorf("invalid JWT: %w", err)
|
||||
}
|
||||
|
||||
return authData, claims, nil
|
||||
}
|
||||
|
||||
func GetClientFromMarketplace(mp *marketplace.Marketplace) (*wbclient.Client, error) {
|
||||
|
||||
authData, claims, err := DecodeWildberriesJwt(mp.AuthDataJson)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to decode Wildberries JWT")
|
||||
}
|
||||
exp := claims["exp"].(float64)
|
||||
// chec if token is expired, for now unix date
|
||||
now := float64(time.Now().Unix())
|
||||
if exp < now {
|
||||
return nil, fmt.Errorf("token is expired")
|
||||
}
|
||||
securityHandler := NewWildberriesSecurityHandler(authData.Token)
|
||||
httpClient := &http.Client{
|
||||
Transport: NewRateLimitTransport(),
|
||||
}
|
||||
client, err := wbclient.NewClient("https://content-api.wildberries.ru", securityHandler, wbclient.WithClient(httpClient))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create Wildberries client: %w", err)
|
||||
}
|
||||
return client, nil
|
||||
}
|
||||
63
internal/wb/products/adapter_grpc.go
Normal file
63
internal/wb/products/adapter_grpc.go
Normal file
@@ -0,0 +1,63 @@
|
||||
package products
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/samber/lo"
|
||||
"google.golang.org/grpc"
|
||||
pb "sipro-mps/api/generated/v1/wb/products"
|
||||
"sipro-mps/internal/marketplace"
|
||||
)
|
||||
|
||||
type AdapterGRPC struct {
|
||||
pb.UnimplementedProductsServiceServer
|
||||
repo Repository
|
||||
}
|
||||
|
||||
func NewAdapterGRPC(repo Repository) *AdapterGRPC {
|
||||
return &AdapterGRPC{
|
||||
repo: repo,
|
||||
}
|
||||
}
|
||||
|
||||
func RegisterAdapterGRPC(server *grpc.Server, marketplacesRepository marketplace.Repository) (*Repository, error) {
|
||||
repo := NewAPIRepository(marketplacesRepository)
|
||||
adapter := NewAdapterGRPC(repo)
|
||||
pb.RegisterProductsServiceServer(server, adapter)
|
||||
return &repo, nil
|
||||
}
|
||||
func (a *AdapterGRPC) GetProducts(req *pb.GetProductsRequest, stream pb.ProductsService_GetProductsServer) error {
|
||||
ctx := stream.Context()
|
||||
resultChan := make(chan []pb.Product)
|
||||
errChan := make(chan error)
|
||||
go a.repo.StreamAllProductsCache(ctx, int(req.MarketplaceId), resultChan, errChan)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
fmt.Println("context done")
|
||||
return ctx.Err()
|
||||
case products, ok := <-resultChan:
|
||||
if !ok {
|
||||
fmt.Println("result channel closed")
|
||||
return nil
|
||||
}
|
||||
resp := &pb.GetProductsResponse{
|
||||
Products: lo.Map(products, func(p pb.Product, _ int) *pb.Product {
|
||||
return &p
|
||||
}),
|
||||
}
|
||||
if err := stream.Send(resp); err != nil {
|
||||
fmt.Println("error sending response", err)
|
||||
return err
|
||||
}
|
||||
case err, ok := <-errChan:
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
if ok && err != nil {
|
||||
fmt.Println("error in channel", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
9
internal/wb/products/entities.go
Normal file
9
internal/wb/products/entities.go
Normal file
@@ -0,0 +1,9 @@
|
||||
package products
|
||||
|
||||
import (
|
||||
pb "sipro-mps/api/generated/v1/wb/products"
|
||||
"sipro-mps/pkg/api/wb/client"
|
||||
)
|
||||
|
||||
type WbProduct = api.ContentV2GetCardsListPostOKCardsItem
|
||||
type PbProduct = pb.Product
|
||||
28
internal/wb/products/mapping/converter.go
Normal file
28
internal/wb/products/mapping/converter.go
Normal file
@@ -0,0 +1,28 @@
|
||||
package mapping
|
||||
|
||||
import wbclient "sipro-mps/pkg/api/wb/client"
|
||||
|
||||
// import (
|
||||
//
|
||||
// proto "sipro-mps/api/generated/v1/wb/products"
|
||||
// internal "sipro-mps/internal/wb/products"
|
||||
// wbclient "sipro-mps/pkg/api/wb/client"
|
||||
//
|
||||
// )
|
||||
//
|
||||
// //go:generate go run github.com/jmattheis/goverter/cmd/goverter gen -global "ignoreUnexported yes" .
|
||||
//
|
||||
// // goverter:converter
|
||||
// // goverter:extend OptIntToInt64 OptStringToString
|
||||
//
|
||||
// type Converter interface {
|
||||
// // goverter:ignore state sizeCache unknownFields
|
||||
//
|
||||
// ToProto(details *internal.WbProduct) *proto.Product
|
||||
// }
|
||||
func OptIntToInt64(i wbclient.OptInt) int64 {
|
||||
return int64(i.Value)
|
||||
}
|
||||
func OptStringToString(s wbclient.OptString) string {
|
||||
return s.Value
|
||||
}
|
||||
40
internal/wb/products/mapping/generated/generated.go
Normal file
40
internal/wb/products/mapping/generated/generated.go
Normal file
@@ -0,0 +1,40 @@
|
||||
// Code generated by github.com/jmattheis/goverter, DO NOT EDIT.
|
||||
//go:build !goverter
|
||||
|
||||
package generated
|
||||
|
||||
import (
|
||||
products "sipro-mps/api/generated/v1/wb/products"
|
||||
mapping "sipro-mps/internal/wb/products/mapping"
|
||||
client "sipro-mps/pkg/api/wb/client"
|
||||
)
|
||||
|
||||
type ConverterImpl struct{}
|
||||
|
||||
func (c *ConverterImpl) ToProto(source *client.ContentV2GetCardsListPostOKCardsItem) *products.Product {
|
||||
var pProductsProduct *products.Product
|
||||
if source != nil {
|
||||
var productsProduct products.Product
|
||||
productsProduct.NmID = mapping.OptIntToInt64((*source).NmID)
|
||||
productsProduct.SubjectID = mapping.OptIntToInt64((*source).SubjectID)
|
||||
productsProduct.VendorCode = mapping.OptStringToString((*source).VendorCode)
|
||||
if (*source).Sizes != nil {
|
||||
productsProduct.Sizes = make([]*products.Product_Size, len((*source).Sizes))
|
||||
for i := 0; i < len((*source).Sizes); i++ {
|
||||
productsProduct.Sizes[i] = c.apiContentV2GetCardsListPostOKCardsItemSizesItemToPProductsProduct_Size((*source).Sizes[i])
|
||||
}
|
||||
}
|
||||
pProductsProduct = &productsProduct
|
||||
}
|
||||
return pProductsProduct
|
||||
}
|
||||
func (c *ConverterImpl) apiContentV2GetCardsListPostOKCardsItemSizesItemToPProductsProduct_Size(source client.ContentV2GetCardsListPostOKCardsItemSizesItem) *products.Product_Size {
|
||||
var productsProduct_Size products.Product_Size
|
||||
if source.Skus != nil {
|
||||
productsProduct_Size.Skus = make([]string, len(source.Skus))
|
||||
for i := 0; i < len(source.Skus); i++ {
|
||||
productsProduct_Size.Skus[i] = source.Skus[i]
|
||||
}
|
||||
}
|
||||
return &productsProduct_Size
|
||||
}
|
||||
13
internal/wb/products/repository.go
Normal file
13
internal/wb/products/repository.go
Normal file
@@ -0,0 +1,13 @@
|
||||
package products
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sipro-mps/internal/marketplace"
|
||||
)
|
||||
|
||||
type Repository interface {
|
||||
GetAllProducts(ctx context.Context, marketplaceId int) ([]WbProduct, error)
|
||||
StreamAllProducts(ctx context.Context, marketplaceId int, resultChan chan<- []WbProduct, errChan chan<- error)
|
||||
StreamAllProductsCache(ctx context.Context, marketplaceId int, resultChan chan<- []PbProduct, errChan chan<- error)
|
||||
ParseMarketplace(ctx context.Context, marketplaceId int) (*marketplace.Marketplace, string, error)
|
||||
}
|
||||
224
internal/wb/products/repository_api.go
Normal file
224
internal/wb/products/repository_api.go
Normal file
@@ -0,0 +1,224 @@
|
||||
package products
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/go-faster/errors"
|
||||
"github.com/redis/rueidis"
|
||||
"github.com/samber/lo"
|
||||
pb "sipro-mps/api/generated/v1/wb/products"
|
||||
"sipro-mps/internal/marketplace"
|
||||
"sipro-mps/internal/redis"
|
||||
"sipro-mps/internal/tasks/client"
|
||||
"sipro-mps/internal/tasks/types"
|
||||
"sipro-mps/internal/wb"
|
||||
"sipro-mps/internal/wb/products/mapping/generated"
|
||||
wbapi "sipro-mps/pkg/api/wb/client"
|
||||
)
|
||||
|
||||
const (
|
||||
maxRetries = 5
|
||||
maxProductsPerRequest = 100
|
||||
)
|
||||
|
||||
type apiRepository struct {
|
||||
marketplaceRepository marketplace.Repository
|
||||
}
|
||||
|
||||
func (a apiRepository) ParseMarketplace(ctx context.Context, marketplaceId int) (*marketplace.Marketplace, string, error) {
|
||||
marketplaceByID, err := a.marketplaceRepository.GetMarketplaceByID(ctx, marketplaceId)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
_, claims, err := wb.DecodeWildberriesJwt(marketplaceByID.AuthDataJson)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
sellerId := claims["sid"].(string)
|
||||
return marketplaceByID, sellerId, nil
|
||||
}
|
||||
|
||||
func fetchProducts(
|
||||
ctx context.Context,
|
||||
client *wbapi.Client,
|
||||
sellerId string,
|
||||
resultChan chan<- []WbProduct,
|
||||
errChan chan<- error,
|
||||
) {
|
||||
defer close(resultChan)
|
||||
defer close(errChan)
|
||||
request := wbapi.ContentV2GetCardsListPostReq{}
|
||||
request.Settings.SetTo(wbapi.ContentV2GetCardsListPostReqSettings{})
|
||||
|
||||
request.Settings.Value.Cursor.SetTo(wbapi.ContentV2GetCardsListPostReqSettingsCursor{})
|
||||
request.Settings.Value.Cursor.Value.Limit.SetTo(maxProductsPerRequest)
|
||||
|
||||
request.Settings.Value.Filter.SetTo(wbapi.ContentV2GetCardsListPostReqSettingsFilter{})
|
||||
request.Settings.Value.Filter.Value.WithPhoto.SetTo(-1)
|
||||
currentRetry := 0
|
||||
for {
|
||||
response, err := client.ContentV2GetCardsListPost(ctx, &request, wbapi.ContentV2GetCardsListPostParams{Locale: wbapi.NewOptString("ru")})
|
||||
if err != nil {
|
||||
currentRetry++
|
||||
if currentRetry >= maxRetries {
|
||||
errChan <- fmt.Errorf("fetching product IDs: %w", err)
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
currentRetry = 0
|
||||
|
||||
switch r := response.(type) {
|
||||
case *wbapi.ContentV2GetCardsListPostOKHeaders:
|
||||
err = wb.SyncRateLimitRemaining(ctx, sellerId, r.XRatelimitRemaining.Value)
|
||||
if err != nil {
|
||||
errChan <- fmt.Errorf("syncing rate limit: %w", err)
|
||||
return
|
||||
}
|
||||
resultChan <- r.Response.Cards
|
||||
if r.Response.Cursor.Value.Total.Value < maxProductsPerRequest {
|
||||
return
|
||||
}
|
||||
request.Settings.Value.Cursor.Value.UpdatedAt.SetTo(r.Response.Cursor.Value.UpdatedAt.Value)
|
||||
request.Settings.Value.Cursor.Value.NmID.SetTo(r.Response.Cursor.Value.NmID.Value)
|
||||
case *wbapi.R429Headers:
|
||||
err = wb.SetRateLimitRetry(ctx, sellerId, r.XRatelimitRetry.Value, r.XRatelimitLimit.Value, r.XRatelimitReset.Value)
|
||||
if err != nil {
|
||||
errChan <- fmt.Errorf("setting rate limit retry: %w", err)
|
||||
return
|
||||
}
|
||||
default:
|
||||
errChan <- fmt.Errorf("unexpected response type: %T", r)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (a apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId int, resultChan chan<- []pb.Product, errChan chan<- error) {
|
||||
defer close(resultChan)
|
||||
defer close(errChan)
|
||||
_, sellerId, err := a.ParseMarketplace(ctx, marketplaceId)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
c := *redis.Client
|
||||
key := fmt.Sprintf("wb:products:%s", sellerId)
|
||||
jsonString, err := c.Do(ctx, c.B().Get().Key(key).Build()).ToString()
|
||||
if err == nil && jsonString != "null" {
|
||||
var result []pb.Product
|
||||
err = json.Unmarshal([]byte(jsonString), &result)
|
||||
if err != nil {
|
||||
errChan <- fmt.Errorf("unmarshalling products from cache: %w", err)
|
||||
return
|
||||
}
|
||||
task, err := types.NewFetchProductsTask(marketplaceId)
|
||||
if err != nil {
|
||||
errChan <- fmt.Errorf("creating fetch products task: %w", err)
|
||||
return
|
||||
}
|
||||
_, err = client.Client.Enqueue(task)
|
||||
if err != nil {
|
||||
errChan <- fmt.Errorf("enqueueing fetch products task: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
resultChan <- result
|
||||
return
|
||||
}
|
||||
if !errors.As(err, &rueidis.Nil) && err != nil {
|
||||
errChan <- fmt.Errorf("fetching products from cache: %w", err)
|
||||
return
|
||||
}
|
||||
converter := generated.ConverterImpl{}
|
||||
|
||||
innerResultChan := make(chan []WbProduct)
|
||||
innerErrChan := make(chan error)
|
||||
go a.StreamAllProducts(ctx, marketplaceId, innerResultChan, innerErrChan)
|
||||
var allProducts []pb.Product
|
||||
defer func() {
|
||||
jsonData, err := json.Marshal(allProducts)
|
||||
if err != nil {
|
||||
errChan <- fmt.Errorf("marshalling products to cache: %w", err)
|
||||
return
|
||||
}
|
||||
err = c.Do(ctx, c.B().Set().Key(key).Value(string(jsonData)).Build()).Error()
|
||||
if err != nil {
|
||||
errChan <- fmt.Errorf("setting products to cache: %w", err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case err, ok := <-innerErrChan:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
errChan <- fmt.Errorf("streaming products: %w", err)
|
||||
return
|
||||
case products, ok := <-innerResultChan:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
pbProducts := lo.Map(products, func(p WbProduct, _ int) pb.Product {
|
||||
return *converter.ToProto(&p)
|
||||
})
|
||||
allProducts = append(allProducts, pbProducts...)
|
||||
resultChan <- pbProducts
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
func (a apiRepository) GetAllProducts(ctx context.Context, marketplaceId int) ([]WbProduct, error) {
|
||||
marketplaceByID, sellerId, err := a.ParseMarketplace(ctx, marketplaceId)
|
||||
fromMarketplace, err := wb.GetClientFromMarketplace(marketplaceByID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resultChan := make(chan []WbProduct)
|
||||
errChan := make(chan error)
|
||||
go fetchProducts(ctx, fromMarketplace, sellerId, resultChan, errChan)
|
||||
|
||||
var products []WbProduct
|
||||
isWaiting := true
|
||||
for {
|
||||
if !isWaiting {
|
||||
break
|
||||
}
|
||||
select {
|
||||
case err, ok := <-errChan:
|
||||
if !ok {
|
||||
isWaiting = false
|
||||
continue
|
||||
}
|
||||
return nil, err
|
||||
case newProducts, ok := <-resultChan:
|
||||
if !ok {
|
||||
isWaiting = false
|
||||
}
|
||||
products = append(products, newProducts...)
|
||||
}
|
||||
}
|
||||
return products, nil
|
||||
}
|
||||
|
||||
func (a apiRepository) StreamAllProducts(ctx context.Context, marketplaceId int, resultChan chan<- []WbProduct, errChan chan<- error) {
|
||||
marketplaceByID, sellerId, err := a.ParseMarketplace(ctx, marketplaceId)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
fromMarketplace, err := wb.GetClientFromMarketplace(marketplaceByID)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
go fetchProducts(ctx, fromMarketplace, sellerId, resultChan, errChan)
|
||||
}
|
||||
|
||||
func NewAPIRepository(marketplaceRepository marketplace.Repository) Repository {
|
||||
return &apiRepository{
|
||||
marketplaceRepository: marketplaceRepository,
|
||||
}
|
||||
}
|
||||
170
internal/wb/rate_limiter.go
Normal file
170
internal/wb/rate_limiter.go
Normal file
@@ -0,0 +1,170 @@
|
||||
package wb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/redis/rueidis"
|
||||
"net/http"
|
||||
"sipro-mps/internal/redis"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultBucketCapacity = 10 // max burst size
|
||||
refillRate = 100.0 / 60000 // 300 requests per minute → 1 token per 200ms
|
||||
tokenTTLMillis = 60000 // Redis key TTL: 60s
|
||||
)
|
||||
|
||||
var tokenBucketScript = rueidis.NewLuaScript(`
|
||||
local key = KEYS[1]
|
||||
local now = tonumber(ARGV[1])
|
||||
local default_capacity = tonumber(ARGV[2])
|
||||
local refill_rate = tonumber(ARGV[3])
|
||||
local ttl = tonumber(ARGV[4])
|
||||
|
||||
-- Retry lock
|
||||
local retry_key = key .. ":retry_until"
|
||||
local retry_until = tonumber(redis.call("GET", retry_key))
|
||||
if retry_until and now < retry_until then
|
||||
return retry_until - now
|
||||
end
|
||||
|
||||
-- Token Bucket
|
||||
local capacity_key = key .. ":capacity"
|
||||
local token_key = key .. ":tokens"
|
||||
local time_key = key .. ":last_refill"
|
||||
|
||||
local capacity = tonumber(redis.call("GET", capacity_key)) or default_capacity
|
||||
local tokens = tonumber(redis.call("GET", token_key))
|
||||
local last_refill = tonumber(redis.call("GET", time_key))
|
||||
|
||||
if tokens == nil then tokens = capacity end
|
||||
if last_refill == nil then last_refill = now end
|
||||
|
||||
local elapsed = now - last_refill
|
||||
local refill = elapsed * refill_rate
|
||||
tokens = math.min(capacity, tokens + refill)
|
||||
last_refill = now
|
||||
|
||||
if tokens >= 1 then
|
||||
tokens = tokens - 1
|
||||
redis.call("SET", token_key, tokens)
|
||||
redis.call("SET", time_key, last_refill)
|
||||
redis.call("PEXPIRE", token_key, ttl)
|
||||
redis.call("PEXPIRE", time_key, ttl)
|
||||
return 0
|
||||
else
|
||||
local wait_time = math.ceil((1 - tokens) / refill_rate)
|
||||
return wait_time
|
||||
end
|
||||
`)
|
||||
|
||||
type RateLimitTransport struct {
|
||||
http.RoundTripper
|
||||
}
|
||||
|
||||
func (t *RateLimitTransport) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
ctx := req.Context()
|
||||
|
||||
tokenString := req.Header.Get("Authorization")
|
||||
authData := NewWbAuthData(tokenString)
|
||||
authDataBytes, err := json.Marshal(authData)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal Wildberries auth data: %w", err)
|
||||
}
|
||||
_, claims, err := DecodeWildberriesJwt(authDataBytes)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to decode Wildberries JWT: %w", err)
|
||||
}
|
||||
sellerId := claims["sid"].(string)
|
||||
if sellerId == "" {
|
||||
return nil, fmt.Errorf("sellerId is required in JWT claims")
|
||||
}
|
||||
now := time.Now().UnixMilli()
|
||||
client := *redis.Client
|
||||
|
||||
waitTime, err := tokenBucketScript.Exec(ctx, client, []string{sellerId}, []string{
|
||||
fmt.Sprintf("%d", now),
|
||||
fmt.Sprintf("%d", defaultBucketCapacity),
|
||||
fmt.Sprintf("%f", refillRate),
|
||||
fmt.Sprintf("%d", tokenTTLMillis),
|
||||
}).ToInt64()
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("rate limit script error: %w", err)
|
||||
}
|
||||
|
||||
if waitTime > 0 {
|
||||
select {
|
||||
case <-time.After(time.Duration(waitTime) * time.Millisecond):
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
return t.RoundTripper.RoundTrip(req)
|
||||
}
|
||||
|
||||
func SyncRateLimitRemaining(ctx context.Context, sellerId string, remaining int) error {
|
||||
if sellerId == "" || remaining < 0 {
|
||||
return fmt.Errorf("invalid sellerId or remaining")
|
||||
}
|
||||
now := time.Now().UnixMilli()
|
||||
client := *redis.Client
|
||||
|
||||
cmds := []rueidis.Completed{
|
||||
client.B().Set().Key(sellerId + ":capacity").Value(fmt.Sprintf("%d", defaultBucketCapacity)).Ex(time.Minute).Build(),
|
||||
client.B().Set().Key(sellerId + ":tokens").Value(fmt.Sprintf("%d", remaining)).Ex(time.Minute).Build(),
|
||||
client.B().Set().Key(sellerId + ":last_refill").Value(fmt.Sprintf("%d", now)).Ex(time.Minute).Build(),
|
||||
}
|
||||
|
||||
results := client.DoMulti(ctx, cmds...)
|
||||
for _, res := range results {
|
||||
if res.Error() != nil {
|
||||
return fmt.Errorf("failed to sync rate limit: %w", res.Error())
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func SetRateLimitRetry(ctx context.Context, sellerId string, retrySeconds int, limit int, resetSeconds int) error {
|
||||
if sellerId == "" {
|
||||
return fmt.Errorf("sellerId is required")
|
||||
}
|
||||
now := time.Now()
|
||||
retryUntil := now.Add(time.Duration(retrySeconds) * time.Second).UnixMilli()
|
||||
client := *redis.Client
|
||||
|
||||
cmds := []rueidis.Completed{
|
||||
client.B().Set().
|
||||
Key(sellerId + ":retry_until").
|
||||
Value(fmt.Sprintf("%d", retryUntil)).
|
||||
Px(time.Duration(retrySeconds+5) * time.Second).Build(),
|
||||
}
|
||||
|
||||
if limit > 0 {
|
||||
cmds = append(cmds, client.B().Set().
|
||||
Key(sellerId+":capacity").
|
||||
Value(fmt.Sprintf("%d", limit)).
|
||||
Ex(time.Hour).Build())
|
||||
}
|
||||
|
||||
if resetSeconds > 0 {
|
||||
resetAt := now.Add(time.Duration(resetSeconds) * time.Second)
|
||||
fmt.Printf("Seller %s rate limit resets at %v (limit: %d)\n", sellerId, resetAt, limit)
|
||||
}
|
||||
|
||||
results := client.DoMulti(ctx, cmds...)
|
||||
for _, res := range results {
|
||||
if res.Error() != nil {
|
||||
return fmt.Errorf("failed to set retry info: %w", res.Error())
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewRateLimitTransport() *RateLimitTransport {
|
||||
return &RateLimitTransport{RoundTripper: http.DefaultTransport}
|
||||
}
|
||||
22
internal/wb/security_handler.go
Normal file
22
internal/wb/security_handler.go
Normal file
@@ -0,0 +1,22 @@
|
||||
package wb
|
||||
|
||||
import (
|
||||
"context"
|
||||
wbclient "sipro-mps/pkg/api/wb/client"
|
||||
)
|
||||
|
||||
type WildberriesSecurityHandler struct {
|
||||
ApiKey string
|
||||
}
|
||||
|
||||
func (sh WildberriesSecurityHandler) HeaderApiKey(ctx context.Context, operationName wbclient.OperationName, client *wbclient.Client) (wbclient.HeaderApiKey, error) {
|
||||
return wbclient.HeaderApiKey{
|
||||
APIKey: sh.ApiKey,
|
||||
Roles: nil,
|
||||
}, nil
|
||||
}
|
||||
func NewWildberriesSecurityHandler(apiKey string) WildberriesSecurityHandler {
|
||||
return WildberriesSecurityHandler{
|
||||
ApiKey: apiKey,
|
||||
}
|
||||
}
|
||||
1
internal/wb/types.go
Normal file
1
internal/wb/types.go
Normal file
@@ -0,0 +1 @@
|
||||
package wb
|
||||
Reference in New Issue
Block a user