Add gRPC server implementation and database integration for marketplace and products

This commit is contained in:
2025-05-27 03:41:52 +03:00
parent 008f3df42d
commit b083cccc09
44 changed files with 2182 additions and 1201 deletions

35
internal/ozon/common.go Normal file
View File

@@ -0,0 +1,35 @@
package ozon
import (
"errors"
"git.denco.store/fakz9/ozon-api-client/ozon"
"github.com/tidwall/gjson"
"net/http"
"sipro-mps/internal/marketplace"
)
func GetClientFromMarketplace(mp *marketplace.Marketplace) (*ozon.Client, error) {
authDataParsed := gjson.Parse(mp.AuthData)
clientIdResult := authDataParsed.Get("clientId")
apiKeyResult := authDataParsed.Get("clientToken")
if !clientIdResult.Exists() || !apiKeyResult.Exists() {
return nil, errors.New("auth data is not valid")
}
apiKey := apiKeyResult.String()
clientId := clientIdResult.String()
httpClient := &http.Client{
Transport: NewRateLimitTransport(),
}
opts := []ozon.ClientOption{
ozon.WithAPIKey(apiKey),
ozon.WithClientId(clientId),
ozon.WithHttpClient(httpClient),
}
client := ozon.NewClient(opts...)
if client == nil {
return nil, errors.New("failed to create ozon client")
}
return client, nil
}

View File

@@ -0,0 +1,62 @@
package products
import (
"github.com/samber/lo"
"google.golang.org/grpc"
pb "sipro-mps/api/generated/v1/ozon/products"
"sipro-mps/internal/marketplace"
"sipro-mps/internal/ozon/products/mapping/generated"
)
type AdapterGRPC struct {
pb.UnimplementedProductsServiceServer
repo Repository
}
func NewAdapterGRPC(repo Repository) *AdapterGRPC {
return &AdapterGRPC{
repo: repo,
}
}
// RegisterAdapterGRPC registers the gRPC server for the Products service.
func RegisterAdapterGRPC(server *grpc.Server, marketplaceRepo marketplace.Repository) (repo *Repository, err error) {
apiRepo := NewAPIRepository(marketplaceRepo)
adapter := NewAdapterGRPC(apiRepo)
pb.RegisterProductsServiceServer(server, adapter)
return &apiRepo, nil
}
func (g *AdapterGRPC) GetListOfProducts(req *pb.GetListOfProductsRequest, stream pb.ProductsService_GetListOfProductsServer) error {
ctx := stream.Context()
converter := generated.ConverterImpl{}
resultChan := make(chan []OzonProduct)
errChan := make(chan error)
g.repo.StreamAllProducts(ctx, 262, resultChan, errChan)
for {
select {
case <-ctx.Done():
return ctx.Err() // Handle context cancellation
case products, ok := <-resultChan:
if !ok {
return nil
}
protoProducts := lo.Map(products, func(product OzonProduct, _ int) *pb.Product {
return converter.ToProto(&product)
})
resp := &pb.GetListOfProductsResponse{
Products: protoProducts,
}
if err := stream.Send(resp); err != nil {
return err // Error sending response
}
case err, ok := <-errChan:
if !ok {
return nil // Exit loop when errChan is closed
}
if err != nil {
return err
}
}
}
}

View File

@@ -0,0 +1,5 @@
package products
import "git.denco.store/fakz9/ozon-api-client/ozon"
type OzonProduct = ozon.ProductDetails

View File

@@ -0,0 +1,20 @@
package mapping
import (
internal "git.denco.store/fakz9/ozon-api-client/ozon"
proto "sipro-mps/api/generated/v1/ozon/products"
)
//go:generate go run github.com/jmattheis/goverter/cmd/goverter gen -g 'ignoreUnexported yes' .
// goverter:converter
// goverter:extend Int632ToInt64
type Converter interface {
// goverter:ignore state sizeCache unknownFields
ToProto(details *internal.ProductDetails) *proto.Product
}
func Int632ToInt64(i int32) int64 {
return int64(i)
}

View File

@@ -0,0 +1,56 @@
// Code generated by github.com/jmattheis/goverter, DO NOT EDIT.
//go:build !goverter
package generated
import (
ozon "git.denco.store/fakz9/ozon-api-client/ozon"
products "sipro-mps/api/generated/v1/ozon/products"
ozon "git.denco.store/fakz9/ozon-api-client/ozon"
)
type ConverterImpl struct{}
func (c *ConverterImpl) ToProto(source *ozon.ProductDetails) *products.Product {
var pProductsProduct *products.Product
if source != nil {
var productsProduct products.Product
productsProduct.Id = (*source).Id
productsProduct.OfferId = (*source).OfferId
productsProduct.Stocks = c.ozonProductDetailStockToPProductsProduct_Stocks((*source).Stocks)
if (*source).Barcodes != nil {
productsProduct.Barcodes = make([]string, len((*source).Barcodes))
for i := 0; i < len((*source).Barcodes); i++ {
productsProduct.Barcodes[i] = (*source).Barcodes[i]
}
}
productsProduct.Statuses = c.ozonProductDetailsStatusToPProductsProduct_Status((*source).Statuses)
pProductsProduct = &productsProduct
}
return pProductsProduct
}
func (c *ConverterImpl) ozonProductDetailStockStockToPProductsProduct_Stock(source ozon.ProductDetailStockStock) *products.Product_Stock {
var productsProduct_Stock products.Product_Stock
productsProduct_Stock.Present = mapping.Int632ToInt64(source.Present)
productsProduct_Stock.Reserved = mapping.Int632ToInt64(source.Reserved)
productsProduct_Stock.SKU = source.SKU
productsProduct_Stock.Source = source.Source
return &productsProduct_Stock
}
func (c *ConverterImpl) ozonProductDetailStockToPProductsProduct_Stocks(source ozon.ProductDetailStock) *products.Product_Stocks {
var productsProduct_Stocks products.Product_Stocks
if source.Stocks != nil {
productsProduct_Stocks.Stocks = make([]*products.Product_Stock, len(source.Stocks))
for i := 0; i < len(source.Stocks); i++ {
productsProduct_Stocks.Stocks[i] = c.ozonProductDetailStockStockToPProductsProduct_Stock(source.Stocks[i])
}
}
productsProduct_Stocks.HasStock = source.HasStock
return &productsProduct_Stocks
}
func (c *ConverterImpl) ozonProductDetailsStatusToPProductsProduct_Status(source ozon.ProductDetailsStatus) *products.Product_Status {
var productsProduct_Status products.Product_Status
productsProduct_Status.StatusName = source.StatusName
return &productsProduct_Status
}

View File

@@ -0,0 +1,8 @@
package products
import "context"
type Repository interface {
GetAllProducts(ctx context.Context, marketplaceId int) ([]OzonProduct, error)
StreamAllProducts(ctx context.Context, marketplaceId int, resultChan chan<- []OzonProduct, errChan chan<- error)
}

View File

@@ -0,0 +1,118 @@
package products
import (
"context"
"fmt"
api "git.denco.store/fakz9/ozon-api-client/ozon"
"github.com/samber/lo"
"sipro-mps/internal/marketplace"
"sipro-mps/internal/ozon"
"sync"
)
type apiRepository struct {
marketplaceRepository marketplace.Repository
}
func NewAPIRepository(marketplaceRepository marketplace.Repository) Repository {
return &apiRepository{
marketplaceRepository: marketplaceRepository,
}
}
func fetchProductIds(ctx context.Context, client *api.Client, resultChan chan<- []int64, errChan chan<- error) {
defer close(resultChan)
lastId := ""
for {
resp, err := client.Products().GetListOfProducts(ctx, &api.GetListOfProductsParams{
Filter: api.GetListOfProductsFilter{Visibility: "ALL"},
LastId: lastId,
Limit: 1000,
})
if err != nil {
// dev
panic(err)
//errChan <- fmt.Errorf("fetching product IDs: %w", err)
return
}
items := resp.Result.Items
if len(items) == 0 {
break
}
productIds := lo.Map(items, func(item api.GetListOfProductsResultItem, _ int) int64 { return item.ProductId })
resultChan <- productIds
lastId = resp.Result.LastId
if lastId == "" {
break
}
}
}
func fetchProducts(ctx context.Context, client *api.Client, productIdsChan <-chan []int64, resultChan chan<- []OzonProduct, errChan chan<- error) {
defer close(resultChan)
defer close(errChan)
wg := sync.WaitGroup{}
for productIds := range productIdsChan {
wg.Add(1)
go func() {
defer wg.Done()
resp, err := client.Products().ListProductsByIDs(ctx, &api.ListProductsByIDsParams{
ProductId: productIds,
})
if err != nil {
// dev
panic(err)
//errChan <- fmt.Errorf("fetching products: %w", err)
return
}
items := resp.Items
resultChan <- items
}()
}
wg.Wait()
}
func (a *apiRepository) GetAllProducts(ctx context.Context, marketplaceId int) ([]OzonProduct, error) {
mp, err := a.marketplaceRepository.GetMarketplaceByID(ctx, marketplaceId)
if err != nil {
return nil, err
}
client, err := ozon.GetClientFromMarketplace(mp)
if err != nil {
return nil, err
}
items := []OzonProduct{}
productIdsChan := make(chan []int64)
producsChan := make(chan []OzonProduct)
errChan := make(chan error)
go fetchProductIds(ctx, client, productIdsChan, errChan)
go fetchProducts(ctx, client, productIdsChan, producsChan, errChan)
for products := range producsChan {
for _, product := range products {
fmt.Println(product.Name)
items = append(items, product)
}
}
fmt.Println(len(items))
return items, nil
}
func (a *apiRepository) StreamAllProducts(ctx context.Context, marketplaceId int, resultChan chan<- []OzonProduct, errChan chan<- error) {
mp, err := a.marketplaceRepository.GetMarketplaceByID(ctx, marketplaceId)
if err != nil {
errChan <- err
return
}
client, err := ozon.GetClientFromMarketplace(mp)
if err != nil {
errChan <- err
return
}
productIdsChan := make(chan []int64)
go fetchProductIds(ctx, client, productIdsChan, errChan)
go fetchProducts(ctx, client, productIdsChan, resultChan, errChan)
}

View File

@@ -0,0 +1,67 @@
package ozon
import (
"fmt"
"github.com/redis/rueidis"
"net/http"
"sipro-mps/internal/redis"
"time"
)
const (
windowSize = time.Second
rps = 50 // requests per second
)
var (
rateLimiterScript = rueidis.NewLuaScript(`
local key = KEYS[1]
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
-- Удаляем старые записи вне окна времени
redis.call('ZREMRANGEBYSCORE', key, '-inf', now - window)
local count = redis.call('ZCARD', key)
if count < limit then
-- Лимит не превышен, добавляем новую метку и устанавливаем TTL
redis.call('ZADD', key, now, now)
redis.call('EXPIRE', key, math.ceil(window / 1000000000))
return 0 -- Можно выполнять запрос сразу
else
-- Лимит превышен, находим самую старую метку
local oldest = redis.call('ZRANGE', key, 0, 0, 'WITHSCORES')[2]
-- Возвращаем время, которое нужно подождать до освобождения слота
return (tonumber(oldest) + window) - now
end
`)
)
type RateLimitTransport struct {
http.RoundTripper
}
func (t *RateLimitTransport) RoundTrip(req *http.Request) (*http.Response, error) {
fmt.Println(time.Now().Format("2006-01-02 15:04:05"))
ctx := req.Context()
clientId := req.Header.Get("Client-Id")
now := time.Now().UnixNano()
waitTime, err := rateLimiterScript.Exec(ctx, *redis.Client, []string{clientId}, []string{
fmt.Sprintf("%d", now),
fmt.Sprintf("%d", int64(windowSize)),
fmt.Sprintf("%d", 50),
}).ToInt64()
if err != nil {
return nil, fmt.Errorf("failed to execute rate limit script: %w", err)
}
if waitTime > 0 {
time.Sleep(time.Duration(waitTime))
}
return t.RoundTripper.RoundTrip(req)
}
func NewRateLimitTransport() *RateLimitTransport {
return &RateLimitTransport{RoundTripper: http.DefaultTransport}
}