feat: refactor product attributes caching logic and add HashArray utility function
This commit is contained in:
@@ -199,27 +199,7 @@ func (a *apiRepository) StreamProductAttributesCache(ctx context.Context, market
|
|||||||
errChan <- err
|
errChan <- err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
identifider, err := mp.GetIdentifier()
|
|
||||||
if err != nil {
|
|
||||||
errChan <- fmt.Errorf("getting marketplace identifier: %w", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
key := fmt.Sprintf("ozon:products:attributes:%s", identifider)
|
|
||||||
var cachedMessage pb.GetProductAttributesResponse
|
|
||||||
err = redis.ReadProtoMessage(ctx, key, &cachedMessage)
|
|
||||||
if err == nil && len(cachedMessage.Items) > 0 {
|
|
||||||
resultChan <- utils.DerefSlice(cachedMessage.Items)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
converter := generated.ConverterImpl{}
|
converter := generated.ConverterImpl{}
|
||||||
var allAttributes []PbProductAttributes
|
|
||||||
defer func() {
|
|
||||||
if len(allAttributes) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
message := pb.GetProductAttributesResponse{Items: utils.ToPtrs(allAttributes)}
|
|
||||||
_ = redis.WriteProtoMessage(ctx, key, &message)
|
|
||||||
}()
|
|
||||||
|
|
||||||
for _, chunk := range lo.Chunk(productIds, 1000) {
|
for _, chunk := range lo.Chunk(productIds, 1000) {
|
||||||
chunkStrings := lo.Map(chunk, func(item int, index int) string {
|
chunkStrings := lo.Map(chunk, func(item int, index int) string {
|
||||||
@@ -241,10 +221,7 @@ func (a *apiRepository) StreamProductAttributesCache(ctx context.Context, market
|
|||||||
return *converter.AttributesToProto(&item)
|
return *converter.AttributesToProto(&item)
|
||||||
})
|
})
|
||||||
resultChan <- attrs
|
resultChan <- attrs
|
||||||
allAttributes = append(allAttributes, attrs...)
|
|
||||||
|
|
||||||
}
|
}
|
||||||
//ozonClient.Products().GetDescriptionOfProducts()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *apiRepository) DeleteProducts(ctx context.Context, marketplaceId int, items []*PbDeleteProductRequestItem) ([]*PbDeleteProductResponseItem, error) {
|
func (a *apiRepository) DeleteProducts(ctx context.Context, marketplaceId int, items []*PbDeleteProductRequestItem) ([]*PbDeleteProductResponseItem, error) {
|
||||||
|
|||||||
@@ -95,7 +95,8 @@ func fetchProducts(
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (a apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId int, resultChan chan<- []pb.Product, errChan chan<- error) {
|
func (a apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId int, resultChan chan<- []pb.Product, errChan chan<- error) {
|
||||||
// DO NOT close channels here - WithCache will handle it (caller/creator owns them)
|
defer close(resultChan)
|
||||||
|
defer close(errChan)
|
||||||
mp, err := a.marketplaceRepository.GetMarketplaceByID(ctx, marketplaceId)
|
mp, err := a.marketplaceRepository.GetMarketplaceByID(ctx, marketplaceId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errChan <- fmt.Errorf("getting marketplace by ID: %w", err)
|
errChan <- fmt.Errorf("getting marketplace by ID: %w", err)
|
||||||
|
|||||||
@@ -1,8 +1,11 @@
|
|||||||
package utils
|
package utils
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/binary"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"hash/fnv"
|
||||||
|
"math"
|
||||||
|
|
||||||
"github.com/golang-jwt/jwt/v5"
|
"github.com/golang-jwt/jwt/v5"
|
||||||
)
|
)
|
||||||
@@ -49,3 +52,74 @@ func DerefSlice[T any](s []*T) []T {
|
|||||||
}
|
}
|
||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Hashable interface {
|
||||||
|
~int | ~int8 | ~int16 | ~int32 | ~int64 |
|
||||||
|
~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 |
|
||||||
|
~string | ~bool | ~float32 | ~float64
|
||||||
|
}
|
||||||
|
|
||||||
|
func HashArray[T Hashable](arr []T) (string, error) {
|
||||||
|
h := fnv.New64a()
|
||||||
|
buf := make([]byte, 8)
|
||||||
|
|
||||||
|
for _, v := range arr {
|
||||||
|
var b []byte
|
||||||
|
|
||||||
|
switch val := any(v).(type) {
|
||||||
|
case int:
|
||||||
|
binary.LittleEndian.PutUint64(buf, uint64(val))
|
||||||
|
b = buf
|
||||||
|
case int8:
|
||||||
|
buf[0] = byte(val)
|
||||||
|
b = buf[:1]
|
||||||
|
case int16:
|
||||||
|
binary.LittleEndian.PutUint16(buf, uint16(val))
|
||||||
|
b = buf[:2]
|
||||||
|
case int32:
|
||||||
|
binary.LittleEndian.PutUint32(buf, uint32(val))
|
||||||
|
b = buf[:4]
|
||||||
|
case int64:
|
||||||
|
binary.LittleEndian.PutUint64(buf, uint64(val))
|
||||||
|
b = buf
|
||||||
|
case uint:
|
||||||
|
binary.LittleEndian.PutUint64(buf, uint64(val))
|
||||||
|
b = buf
|
||||||
|
case uint8:
|
||||||
|
buf[0] = val
|
||||||
|
b = buf[:1]
|
||||||
|
case uint16:
|
||||||
|
binary.LittleEndian.PutUint16(buf, val)
|
||||||
|
b = buf[:2]
|
||||||
|
case uint32:
|
||||||
|
binary.LittleEndian.PutUint32(buf, val)
|
||||||
|
b = buf[:4]
|
||||||
|
case uint64:
|
||||||
|
binary.LittleEndian.PutUint64(buf, val)
|
||||||
|
b = buf
|
||||||
|
case float32:
|
||||||
|
binary.LittleEndian.PutUint32(buf[:4], math.Float32bits(val))
|
||||||
|
b = buf[:4]
|
||||||
|
case float64:
|
||||||
|
binary.LittleEndian.PutUint64(buf, math.Float64bits(val))
|
||||||
|
b = buf
|
||||||
|
case bool:
|
||||||
|
if val {
|
||||||
|
buf[0] = 1
|
||||||
|
} else {
|
||||||
|
buf[0] = 0
|
||||||
|
}
|
||||||
|
b = buf[:1]
|
||||||
|
case string:
|
||||||
|
b = []byte(val)
|
||||||
|
default:
|
||||||
|
return "", fmt.Errorf("unsupported type: %T", val)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := h.Write(b); err != nil {
|
||||||
|
return "", fmt.Errorf("failed to write to hash: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Sprintf("%x", h.Sum64()), nil
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user