feat: add protobuf message compression and decompression for Redis; refactor product fetching logic

This commit is contained in:
2025-08-17 06:16:15 +03:00
parent 38acc4a443
commit abbcc0a81a
1396 changed files with 609 additions and 451436 deletions

View File

@@ -2,10 +2,11 @@ package products
import (
"fmt"
"github.com/samber/lo"
"google.golang.org/grpc"
pb "sipro-mps/api/generated/v1/wb/products"
"sipro-mps/internal/marketplace"
"sipro-mps/pkg/utils"
"google.golang.org/grpc"
)
type AdapterGRPC struct {
@@ -41,9 +42,7 @@ func (a *AdapterGRPC) GetProducts(req *pb.GetProductsRequest, stream pb.Products
return nil
}
resp := &pb.GetProductsResponse{
Products: lo.Map(products, func(p pb.Product, _ int) *pb.Product {
return &p
}),
Products: utils.ToPtrs(products),
}
if err := stream.Send(resp); err != nil {
fmt.Println("error sending response", err)

View File

@@ -5,12 +5,14 @@ import (
"fmt"
pb "sipro-mps/api/generated/v1/wb/products"
"sipro-mps/internal/marketplace"
"sipro-mps/internal/redis"
"sipro-mps/internal/tasks/client"
"sipro-mps/internal/tasks/types"
"sipro-mps/internal/wb"
"sipro-mps/internal/wb/products/mapping/generated"
wbapi "sipro-mps/pkg/api/wb/client"
"sipro-mps/pkg/utils"
"github.com/deliveryhero/pipeline/v2"
"github.com/samber/lo"
)
@@ -104,87 +106,51 @@ func (a apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId
errChan <- fmt.Errorf("getting marketplace identifier: %w", err)
return
}
client, err := wb.GetClientFromMarketplace(mp)
if err != nil {
converter := generated.ConverterImpl{}
key := fmt.Sprintf("wb:products:%s", identifier)
var cachedMessage pb.GetProductsResponse
err = redis.ReadProtoMessage(ctx, key, &cachedMessage)
if err == nil && len(cachedMessage.Products) > 0 {
resultChan <- utils.DerefSlice(cachedMessage.Products)
_ = client.EnqueueFetchProductsTask(types.TypeWbFetchProducts, marketplaceId)
return
}
converter := generated.ConverterImpl{}
transform := pipeline.NewProcessor(func(_ context.Context, products []WbProduct) ([]pb.Product, error) {
return lo.Map(products, func(item WbProduct, _ int) pb.Product {
return *converter.ToProto(&item)
}), nil
}, nil)
inputChan := make(chan []WbProduct)
fetchProducts(ctx, client, identifier, inputChan, nil)
for out := range pipeline.Process(ctx, transform, inputChan) {
resultChan <- out
}
//c := *redis.Client
//key := fmt.Sprintf("wb:products:%s", sellerId)
//jsonString, err := c.Do(ctx, c.B().Get().Key(key).Build()).ToString()
//if err == nil && jsonString != "null" {
// var result []pb.Product
// err = json.Unmarshal([]byte(jsonString), &result)
// if err != nil {
// errChan <- fmt.Errorf("unmarshalling products from cache: %w", err)
// return
// }
// task, err := types.NewFetchProductsTask(types.TypeWbFetchProducts, marketplaceId)
// if err != nil {
// errChan <- fmt.Errorf("creating fetch products task: %w", err)
// return
// }
// _, err = client.Client.Enqueue(task)
// if err != nil {
// errChan <- fmt.Errorf("enqueueing fetch products task: %w", err)
// return
// }
//
// resultChan <- result
// return
//}
//if !errors.As(err, &rueidis.Nil) && err != nil {
// errChan <- fmt.Errorf("fetching products from cache: %w", err)
// return
//}
//converter := generated.ConverterImpl{}
//
//innerResultChan := make(chan []WbProduct)
//innerErrChan := make(chan error)
//go a.StreamAllProducts(ctx, marketplaceId, innerResultChan, innerErrChan)
//var allProducts []pb.Product
//defer func() {
// jsonData, err := json.Marshal(allProducts)
// if err != nil {
// errChan <- fmt.Errorf("marshalling products to cache: %w", err)
// return
// }
// err = c.Do(ctx, c.B().Set().Key(key).Value(string(jsonData)).Build()).Error()
// if err != nil {
// errChan <- fmt.Errorf("setting products to cache: %w", err)
// return
// }
//}()
//for {
// select {
// case err, ok := <-innerErrChan:
// if !ok {
// return
// }
// errChan <- fmt.Errorf("streaming products: %w", err)
// return
// case products, ok := <-innerResultChan:
// if !ok {
// return
// }
// pbProducts := lo.Map(products, func(p WbProduct, _ int) pb.Product {
// return *converter.ToProto(&p)
// })
// allProducts = append(allProducts, pbProducts...)
// resultChan <- pbProducts
// }
//}
innerResultChan := make(chan []WbProduct)
innerErrChan := make(chan error)
go a.StreamAllProducts(ctx, marketplaceId, innerResultChan, innerErrChan)
var allProducts []pb.Product
defer func() {
if len(allProducts) == 0 {
return
}
message := pb.GetProductsResponse{
Products: utils.ToPtrs(allProducts),
}
_ = redis.WriteProtoMessage(ctx, key, &message)
}()
for {
select {
case err, ok := <-innerErrChan:
if !ok {
return
}
errChan <- fmt.Errorf("streaming products: %w", err)
return
case products, ok := <-innerResultChan:
if !ok {
return
}
pbProducts := lo.Map(products, func(p WbProduct, _ int) pb.Product {
return *converter.ToProto(&p)
})
allProducts = append(allProducts, pbProducts...)
resultChan <- pbProducts
}
}
}
func (a apiRepository) GetAllProducts(ctx context.Context, marketplaceId int) ([]WbProduct, error) {