This commit is contained in:
2025-09-28 20:19:45 +03:00
parent 6638ef1b5f
commit 3fd63d5f32
21 changed files with 356 additions and 355 deletions

View File

@@ -2,13 +2,15 @@ package ozon
import (
"errors"
"git.denco.store/fakz9/ozon-api-client/ozon"
"github.com/tidwall/gjson"
"net/http"
"sipro-mps/internal/marketplace"
"git.denco.store/fakz9/ozon-api-client/ozon"
"github.com/redis/rueidis"
"github.com/tidwall/gjson"
)
func GetClientFromMarketplace(mp *marketplace.Marketplace) (*ozon.Client, error) {
func GetClientFromMarketplace(redis rueidis.Client, mp *marketplace.Marketplace) (*ozon.Client, error) {
authDataParsed := gjson.Parse(mp.AuthData)
clientIdResult := authDataParsed.Get("clientId")
@@ -19,7 +21,7 @@ func GetClientFromMarketplace(mp *marketplace.Marketplace) (*ozon.Client, error)
apiKey := apiKeyResult.String()
clientId := clientIdResult.String()
httpClient := &http.Client{
Transport: NewRateLimitTransport(),
Transport: NewRateLimitTransport(redis),
}
opts := []ozon.ClientOption{
ozon.WithAPIKey(apiKey),

View File

@@ -3,10 +3,10 @@ package products
import (
"context"
"fmt"
pb "sipro-mps/api/generated/v1/ozon/products"
"github.com/samber/lo"
"google.golang.org/grpc"
pb "sipro-mps/api/generated/v1/ozon/products"
"sipro-mps/internal/marketplace"
)
type AdapterGRPC struct {
@@ -20,13 +20,10 @@ func NewAdapterGRPC(repo Repository) *AdapterGRPC {
}
}
// RegisterAdapterGRPC registers the gRPC server for the Products service.
func RegisterAdapterGRPC(server *grpc.Server, marketplaceRepo marketplace.Repository) (repo *Repository, err error) {
apiRepo := NewAPIRepository(marketplaceRepo)
adapter := NewAdapterGRPC(apiRepo)
pb.RegisterProductsServiceServer(server, adapter)
return &apiRepo, nil
func Register(server *grpc.Server, repo Repository) {
pb.RegisterProductsServiceServer(server, NewAdapterGRPC(repo))
}
func (g *AdapterGRPC) GetListOfProducts(req *pb.GetListOfProductsRequest, stream pb.ProductsService_GetListOfProductsServer) error {
ctx := stream.Context()
fmt.Printf("GetListOfProducts called with req: %+v\n", req.MarketplaceId)

View File

@@ -0,0 +1,8 @@
package products
import "go.uber.org/fx"
var Module = fx.Options(
fx.Provide(NewAPIRepository),
fx.Invoke(Register),
)

View File

@@ -7,26 +7,28 @@ import (
"sipro-mps/internal/marketplace"
"sipro-mps/internal/ozon"
"sipro-mps/internal/ozon/products/mapping/generated"
"sipro-mps/internal/redis"
"sipro-mps/pkg/utils"
"strconv"
"sync"
api "git.denco.store/fakz9/ozon-api-client/ozon"
"github.com/redis/rueidis"
"github.com/samber/lo"
)
type apiRepository struct {
marketplaceRepository marketplace.Repository
redis rueidis.Client
}
func GetProductsKey(identifier string) string {
return fmt.Sprintf("ozon:products:%s:compressed", identifier)
}
func NewAPIRepository(marketplaceRepository marketplace.Repository) Repository {
func NewAPIRepository(marketplaceRepository marketplace.Repository, redis rueidis.Client) Repository {
return &apiRepository{
marketplaceRepository: marketplaceRepository,
redis: redis,
}
}
@@ -91,7 +93,7 @@ func (a *apiRepository) GetAllProducts(ctx context.Context, marketplaceId int) (
if err != nil {
return nil, err
}
clientFromMarketplace, err := ozon.GetClientFromMarketplace(mp)
clientFromMarketplace, err := ozon.GetClientFromMarketplace(a.redis, mp)
if err != nil {
return nil, err
}
@@ -115,7 +117,7 @@ func (a *apiRepository) StreamAllProducts(ctx context.Context, marketplaceId int
errChan <- err
return
}
clientFromMarketplace, err := ozon.GetClientFromMarketplace(mp)
clientFromMarketplace, err := ozon.GetClientFromMarketplace(a.redis, mp)
if err != nil {
errChan <- err
return
@@ -128,31 +130,31 @@ func (a *apiRepository) StreamAllProducts(ctx context.Context, marketplaceId int
func (a *apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId int, resultChan chan<- []PbProduct, errChan chan<- error) {
defer close(resultChan)
defer close(errChan)
mp, err := a.marketplaceRepository.GetMarketplaceByID(ctx, marketplaceId)
_, err := a.marketplaceRepository.GetMarketplaceByID(ctx, marketplaceId)
if err != nil {
errChan <- err
return
}
identifier, err := mp.GetIdentifier()
//identifier, err := mp.GetIdentifier()
if err != nil {
errChan <- fmt.Errorf("getting marketplace identifier: %w", err)
return
}
key := GetProductsKey(identifier)
//key := GetProductsKey(identifier)
var cachedMessage pb.GetListOfProductsResponse
err = redis.ReadProtoMessage(ctx, key, &cachedMessage)
//err = a.redis.ReadProtoMessage(ctx, key, &cachedMessage)
if err == nil && len(cachedMessage.Products) > 0 {
resultChan <- utils.DerefSlice(cachedMessage.Products)
//_ = client.EnqueueFetchProductsTask(types.TypeOzonFetchProducts, marketplaceId)
return
}
locker := *redis.Locker
_, cancel, err := locker.TryWithContext(ctx, fmt.Sprintf("ozon:products:marketplace:%s:lock", key))
//locker := *redis.Locker
//_, cancel, err := locker.TryWithContext(ctx, fmt.Sprintf("ozon:products:marketplace:%s:lock", key))
if err != nil {
return
}
defer cancel()
//defer cancel()
innerResultChan := make(chan []OzonProduct)
innerErrChan := make(chan error)
@@ -164,8 +166,8 @@ func (a *apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceI
if len(allProducts) == 0 {
return
}
message := pb.GetListOfProductsResponse{Products: utils.ToPtrs(allProducts)}
_ = redis.WriteProtoMessage(ctx, key, &message)
//message := pb.GetListOfProductsResponse{Products: utils.ToPtrs(allProducts)}
//_ = redis.WriteProtoMessage(ctx, key, &message)
}()
for {
select {
@@ -202,23 +204,23 @@ func (a *apiRepository) StreamProductAttributesCache(ctx context.Context, market
errChan <- err
return
}
identifier, err := mp.GetIdentifier()
if err != nil {
errChan <- fmt.Errorf("getting marketplace identifier: %w", err)
return
}
ozonClient, err := ozon.GetClientFromMarketplace(mp)
//identifier, err := mp.GetIdentifier()
//if err != nil {
// errChan <- fmt.Errorf("getting marketplace identifier: %w", err)
// return
//}
ozonClient, err := ozon.GetClientFromMarketplace(a.redis, mp)
if err != nil {
errChan <- err
return
}
key := fmt.Sprintf("ozon:product_attributes:%s:lock", identifier)
locker := *redis.Locker
_, cancel, err := locker.WithContext(ctx, key)
if err != nil {
return
}
defer cancel()
//key := fmt.Sprintf("ozon:product_attributes:%s:lock", identifier)
//locker := *redis.Locker
//_, cancel, err := locker.WithContext(ctx, key)
//if err != nil {
// return
//}
//defer cancel()
converter := generated.ConverterImpl{}
@@ -253,24 +255,24 @@ func (a *apiRepository) DeleteProducts(ctx context.Context, marketplaceId int, i
return nil, err
}
identifier, err := mp.GetIdentifier()
if err != nil {
return nil, fmt.Errorf("getting marketplace identifier: %w", err)
}
//identifier, err := mp.GetIdentifier()
//if err != nil {
// return nil, fmt.Errorf("getting marketplace identifier: %w", err)
//}
ozonClient, err := ozon.GetClientFromMarketplace(mp)
ozonClient, err := ozon.GetClientFromMarketplace(a.redis, mp)
if err != nil {
return nil, err
}
key := fmt.Sprintf("ozon:products_delete:%s:lock", identifier)
locker := *redis.Locker
_, cancel, err := locker.WithContext(ctx, key)
if err != nil {
return nil, err
}
defer cancel()
//key := fmt.Sprintf("ozon:products_delete:%s:lock", identifier)
//locker := *redis.Locker
//_, cancel, err := locker.WithContext(ctx, key)
//if err != nil {
// return nil, err
//
//}
//defer cancel()
// Step 1: map the items into a slice
mapped := lo.Map(items, func(item *PbDeleteProductRequestItem, _ int) *PbDeleteProductResponseItem {
@@ -322,24 +324,24 @@ func (a *apiRepository) CreateOrUpdateProducts(ctx context.Context, marketplaceI
return nil, err
}
ozonClient, err := ozon.GetClientFromMarketplace(mp)
ozonClient, err := ozon.GetClientFromMarketplace(a.redis, mp)
if err != nil {
return nil, err
}
identifier, err := mp.GetIdentifier()
if err != nil {
return nil, fmt.Errorf("getting marketplace identifier: %w", err)
//identifier, err := mp.GetIdentifier()
//if err != nil {
// return nil, fmt.Errorf("getting marketplace identifier: %w", err)
//
//}
}
key := fmt.Sprintf("ozon:products_create_update:%s:lock", identifier)
locker := *redis.Locker
_, cancel, err := locker.WithContext(ctx, key)
if err != nil {
return nil, err
}
defer cancel()
//key := fmt.Sprintf("ozon:products_create_update:%s:lock", identifier)
//locker := *redis.Locker
//_, cancel, err := locker.WithContext(ctx, key)
//if err != nil {
// return nil, err
//
//}
//defer cancel()
converter := generated.ConverterImpl{}
pageSize := 100

View File

@@ -2,10 +2,10 @@ package ozon
import (
"fmt"
"github.com/redis/rueidis"
"net/http"
"sipro-mps/internal/redis"
"time"
"github.com/redis/rueidis"
)
const (
@@ -40,6 +40,7 @@ var (
type RateLimitTransport struct {
http.RoundTripper
redis rueidis.Client
}
func (t *RateLimitTransport) RoundTrip(req *http.Request) (*http.Response, error) {
@@ -47,7 +48,7 @@ func (t *RateLimitTransport) RoundTrip(req *http.Request) (*http.Response, error
clientId := req.Header.Get("Client-Id")
now := time.Now().UnixNano()
waitTime, err := rateLimiterScript.Exec(ctx, *redis.Client, []string{clientId}, []string{
waitTime, err := rateLimiterScript.Exec(ctx, t.redis, []string{clientId}, []string{
fmt.Sprintf("%d", now),
fmt.Sprintf("%d", int64(windowSize)),
fmt.Sprintf("%d", rps),
@@ -61,7 +62,7 @@ func (t *RateLimitTransport) RoundTrip(req *http.Request) (*http.Response, error
}
return t.RoundTripper.RoundTrip(req)
}
func NewRateLimitTransport() *RateLimitTransport {
func NewRateLimitTransport(redis rueidis.Client) *RateLimitTransport {
return &RateLimitTransport{RoundTripper: http.DefaultTransport}
return &RateLimitTransport{RoundTripper: http.DefaultTransport, redis: redis}
}