diff --git a/.env b/.env index 727105b..8b54f38 100644 --- a/.env +++ b/.env @@ -1,12 +1,15 @@ # Redis -REDIS_HOST=localhost +REDIS_HOST=redis +#REDIS_HOST=localhost REDIS_PORT=6379 -REDIS_PASSWORD=ELdhsgqJt5QZUSWKU5vY3D9CXa1a0teIceeHqvCtoPkrDJ0Lge7XIe8187gFjd0qZLv9zwhGr62MqY +#REDIS_PASSWORD=ELdhsgqJt5QZUSWKU5vY3D9CXa1a0teIceeHqvCtoPkrDJ0Lge7XIe8187gFjd0qZLv9zwhGr62MqY +REDIS_PASSWORD= +REDIS_ADDR="${REDIS_HOST}:${REDIS_PORT}" # PostgreSQL POSTGRES_LOGIN=postgres -POSTGRES_PASSWORD=GjitkeYf%5Beq +POSTGRES_PASSWORD=GjitkYf%5Beq POSTGRES_PORT=5432 POSTGRES_DATABASE=sipro -POSTGRES_HOST=localhost +POSTGRES_HOST=62.217.177.161 POSTGRES_URL="postgresql://${POSTGRES_LOGIN}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DATABASE}" diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index d89eb13..0000000 --- a/Dockerfile +++ /dev/null @@ -1,6 +0,0 @@ -FROM alpine:latest -WORKDIR /app -COPY main . -COPY ".env" . -RUN apk add gcompat -CMD ["./main"] \ No newline at end of file diff --git a/build-docker.sh b/build-docker.sh new file mode 100755 index 0000000..8bb9102 --- /dev/null +++ b/build-docker.sh @@ -0,0 +1,9 @@ + +go build -o cmd/server/main cmd/server/main.go +go build -o cmd/tasks_server/main cmd/tasks_server/main.go + +docker build -t git.denco.store/fakz9/sipro-marketplaces:latest cmd/server +docker build -t git.denco.store/fakz9/sipro-marketplaces-worker:latest cmd/tasks_server + +docker push git.denco.store/fakz9/sipro-marketplaces:latest +docker push git.denco.store/fakz9/sipro-marketplaces-worker:latest \ No newline at end of file diff --git a/cmd/test/main.go b/cmd/test/main.go index 8f6f935..0ad8b3f 100644 --- a/cmd/test/main.go +++ b/cmd/test/main.go @@ -1,33 +1,14 @@ package main import ( - "context" - "fmt" - "github.com/go-faster/errors" "github.com/hibiken/asynq" "github.com/joho/godotenv" - "github.com/redis/rueidis" "sipro-mps/internal/config" - "sipro-mps/internal/redis" - "sipro-mps/internal/tasks" + "sipro-mps/internal/tasks/types" ) func main() { godotenv.Load() - ctx := context.Background() - - redis.InitClient(ctx) - c := *redis.Client - key := fmt.Sprintf("wb:products:%d", "test") - v, err := c.Do(ctx, c.B().Get().Key(key).Build()).ToString() - if err != nil { - if errors.As(err, &rueidis.Nil) { - fmt.Println("Key does not exist in Redis:", key) - return - } - } - fmt.Println("Value from Redis:", v) - return cfg, err := config.LoadConfig() if err != nil { panic(err) @@ -39,7 +20,7 @@ func main() { panic(err) } }(client) - task, err := tasks.NewFetchProductsTask(1130) + task, err := types.NewFetchProductsTask(types.TypeOzonFetchProducts, 930) if err != nil { panic(err) } diff --git a/docker-compose.yml b/docker-compose.yml index 9cf510e..2c2464a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,3 @@ -version: '3.8' services: app: image: git.denco.store/fakz9/sipro-marketplaces:latest diff --git a/internal/marketplace/entities.go b/internal/marketplace/entities.go index 3f21459..c06903b 100644 --- a/internal/marketplace/entities.go +++ b/internal/marketplace/entities.go @@ -1,5 +1,17 @@ package marketplace +import ( + "encoding/json" + "github.com/go-faster/errors" + "sipro-mps/pkg/utils" +) + +const ( + WildberriesBaseMarketplace = 0 + OzonBaseMarketplace = 1 + YandexBaseMarketplace = 2 +) + type Marketplace struct { ID int `json:"id"` BaseMarketplace int `json:"base_marketplace"` @@ -7,3 +19,39 @@ type Marketplace struct { WarehouseID string `json:"warehouse_id"` AuthDataJson []byte `json:"auth_data_json,omitempty"` } + +func (m *Marketplace) getIdentifierWildberries() (string, error) { + _, claims, err := utils.DecodeWildberriesJwt(m.AuthDataJson) + if err != nil { + return "", err + } + sellerId := claims["sid"].(string) + return sellerId, err +} + +func (m *Marketplace) getIdentifierOzon() (string, error) { + if m.AuthDataJson == nil { + return "", nil + } + var authData map[string]interface{} + if err := json.Unmarshal(m.AuthDataJson, &authData); err != nil { + return "", errors.Wrap(err, "unmarshal AuthDataJson") + } + if identifier, ok := authData["clientId"].(string); ok { + return identifier, nil + } + return "", errors.New("identifier not found in AuthDataJson") +} + +func (m *Marketplace) GetIdentifier() (string, error) { + switch m.BaseMarketplace { + case WildberriesBaseMarketplace: + return m.getIdentifierWildberries() + case OzonBaseMarketplace: + return m.getIdentifierOzon() + case YandexBaseMarketplace: + return "", errors.New("Yandex marketplace does not support identifier retrieval") + default: + return "", errors.Errorf("unknown base marketplace: %d", m.BaseMarketplace) + } +} diff --git a/internal/ozon/products/adapter_grpc.go b/internal/ozon/products/adapter_grpc.go index 7afad8c..b3d53db 100644 --- a/internal/ozon/products/adapter_grpc.go +++ b/internal/ozon/products/adapter_grpc.go @@ -6,7 +6,6 @@ import ( "google.golang.org/grpc" pb "sipro-mps/api/generated/v1/ozon/products" "sipro-mps/internal/marketplace" - "sipro-mps/internal/ozon/products/mapping/generated" ) type AdapterGRPC struct { @@ -30,12 +29,10 @@ func RegisterAdapterGRPC(server *grpc.Server, marketplaceRepo marketplace.Reposi func (g *AdapterGRPC) GetListOfProducts(req *pb.GetListOfProductsRequest, stream pb.ProductsService_GetListOfProductsServer) error { ctx := stream.Context() fmt.Printf("GetListOfProducts called with req: %+v\n", req.MarketplaceId) - converter := generated.ConverterImpl{} - resultChan := make(chan []OzonProduct) + resultChan := make(chan []pb.Product) errChan := make(chan error) - go g.repo.StreamAllProducts(ctx, int(req.MarketplaceId), resultChan, errChan) // Запускаем в горутине - + go g.repo.StreamAllProductsCache(ctx, int(req.MarketplaceId), resultChan, errChan) // Запускаем в горутине for { select { case <-ctx.Done(): @@ -45,11 +42,10 @@ func (g *AdapterGRPC) GetListOfProducts(req *pb.GetListOfProductsRequest, stream if !ok { return nil // Завершаем, только если результат закрыт } - protoProducts := lo.Map(products, func(product OzonProduct, _ int) *pb.Product { - return converter.ToProto(&product) - }) resp := &pb.GetListOfProductsResponse{ - Products: protoProducts, + Products: lo.Map(products, func(p pb.Product, _ int) *pb.Product { + return &p + }), } if err := stream.Send(resp); err != nil { fmt.Println("GetListOfProducts: error sending response:", err) diff --git a/internal/ozon/products/entities.go b/internal/ozon/products/entities.go index 29210a1..5568640 100644 --- a/internal/ozon/products/entities.go +++ b/internal/ozon/products/entities.go @@ -1,5 +1,9 @@ package products -import "git.denco.store/fakz9/ozon-api-client/ozon" +import ( + "git.denco.store/fakz9/ozon-api-client/ozon" + pb "sipro-mps/api/generated/v1/ozon/products" +) type OzonProduct = ozon.ProductDetails +type PbProduct = pb.Product diff --git a/internal/ozon/products/repository.go b/internal/ozon/products/repository.go index d4a5cf2..c1f5467 100644 --- a/internal/ozon/products/repository.go +++ b/internal/ozon/products/repository.go @@ -5,4 +5,5 @@ import "context" type Repository interface { GetAllProducts(ctx context.Context, marketplaceId int) ([]OzonProduct, error) StreamAllProducts(ctx context.Context, marketplaceId int, resultChan chan<- []OzonProduct, errChan chan<- error) + StreamAllProductsCache(ctx context.Context, marketplaceId int, resultChan chan<- []PbProduct, errChan chan<- error) } diff --git a/internal/ozon/products/repository_api.go b/internal/ozon/products/repository_api.go index 2f4513d..3fc8158 100644 --- a/internal/ozon/products/repository_api.go +++ b/internal/ozon/products/repository_api.go @@ -2,11 +2,16 @@ package products import ( "context" + "encoding/json" "fmt" api "git.denco.store/fakz9/ozon-api-client/ozon" "github.com/samber/lo" "sipro-mps/internal/marketplace" "sipro-mps/internal/ozon" + "sipro-mps/internal/ozon/products/mapping/generated" + "sipro-mps/internal/redis" + "sipro-mps/internal/tasks/client" + "sipro-mps/internal/tasks/types" "sync" ) @@ -81,7 +86,7 @@ func (a *apiRepository) GetAllProducts(ctx context.Context, marketplaceId int) ( if err != nil { return nil, err } - client, err := ozon.GetClientFromMarketplace(mp) + clientFromMarketplace, err := ozon.GetClientFromMarketplace(mp) if err != nil { return nil, err } @@ -89,15 +94,13 @@ func (a *apiRepository) GetAllProducts(ctx context.Context, marketplaceId int) ( productIdsChan := make(chan []int64) producsChan := make(chan []OzonProduct) errChan := make(chan error) - go fetchProductIds(ctx, client, productIdsChan, errChan) - go fetchProducts(ctx, client, productIdsChan, producsChan, errChan) + go fetchProductIds(ctx, clientFromMarketplace, productIdsChan, errChan) + go fetchProducts(ctx, clientFromMarketplace, productIdsChan, producsChan, errChan) for products := range producsChan { for _, product := range products { - fmt.Println(product.Name) items = append(items, product) } } - fmt.Println(len(items)) return items, nil } @@ -107,12 +110,95 @@ func (a *apiRepository) StreamAllProducts(ctx context.Context, marketplaceId int errChan <- err return } - client, err := ozon.GetClientFromMarketplace(mp) + clientFromMarketplace, err := ozon.GetClientFromMarketplace(mp) if err != nil { errChan <- err return } productIdsChan := make(chan []int64) - go fetchProductIds(ctx, client, productIdsChan, errChan) - go fetchProducts(ctx, client, productIdsChan, resultChan, errChan) + go fetchProductIds(ctx, clientFromMarketplace, productIdsChan, errChan) + go fetchProducts(ctx, clientFromMarketplace, productIdsChan, resultChan, errChan) +} + +func (a *apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId int, resultChan chan<- []PbProduct, errChan chan<- error) { + defer close(resultChan) + defer close(errChan) + mp, err := a.marketplaceRepository.GetMarketplaceByID(ctx, marketplaceId) + if err != nil { + errChan <- err + return + } + identifier, err := mp.GetIdentifier() + if err != nil { + errChan <- fmt.Errorf("getting marketplace identifier: %w", err) + return + } + r := *redis.Client + key := fmt.Sprintf("ozon:products:%s", identifier) + + jsonString, err := r.Do(ctx, r.B().Get().Key(key).Build()).ToString() + if err == nil && jsonString != "null" { + var result []PbProduct + err = json.Unmarshal([]byte(jsonString), &result) + if err != nil { + errChan <- fmt.Errorf("unmarshalling products from cache: %w", err) + return + } + task, err := types.NewFetchProductsTask(types.TypeOzonFetchProducts, marketplaceId) + if err != nil { + errChan <- fmt.Errorf("creating fetch products task: %w", err) + return + } + _, err = client.Client.Enqueue(task) + if err != nil { + errChan <- fmt.Errorf("enqueueing fetch products task: %w", err) + return + } + resultChan <- result + return + } + + innerResultChan := make(chan []OzonProduct) + innerErrChan := make(chan error) + go a.StreamAllProducts(ctx, marketplaceId, innerResultChan, innerErrChan) + + converter := generated.ConverterImpl{} + var allProducts []PbProduct + defer func() { + value, err := json.Marshal(allProducts) + if err != nil { + errChan <- fmt.Errorf("marshalling products to JSON: %w", err) + return + } + + err = r.Do(ctx, r.B().Set().Key(key).Value(string(value)).Build()).Error() + if err != nil { + errChan <- fmt.Errorf("setting products in cache: %w", err) + return + } + }() + for { + select { + case err, ok := <-innerErrChan: + if !ok { + return + } + if err != nil { + errChan <- fmt.Errorf("streaming products: %w", err) + return + } + case products, ok := <-innerResultChan: + if !ok && len(products) == 0 { + return + } + pbProducts := lo.Map(products, func(p OzonProduct, _ int) PbProduct { + return *converter.ToProto(&p) + }) + allProducts = append(allProducts, pbProducts...) + resultChan <- pbProducts + if !ok { + return + } + } + } } diff --git a/internal/tasks/ozon/ozon.go b/internal/tasks/ozon/ozon.go new file mode 100644 index 0000000..e2195f7 --- /dev/null +++ b/internal/tasks/ozon/ozon.go @@ -0,0 +1,66 @@ +package ozon + +import ( + "context" + "encoding/json" + "fmt" + "github.com/hibiken/asynq" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/samber/lo" + "sipro-mps/internal/marketplace" + "sipro-mps/internal/ozon/products/mapping/generated" + "sipro-mps/internal/redis" + "sipro-mps/internal/tasks/types" + + "sipro-mps/internal/ozon/products" +) + +type FetchProductsProcessor struct { + Dbpool *pgxpool.Pool +} + +func (p *FetchProductsProcessor) ProcessTask(ctx context.Context, task *asynq.Task) error { + var payload types.FetchProductsTask + err := payload.Unmarshal(task) + if err != nil { + return asynq.SkipRetry + } + marketplaceRepo := marketplace.NewDBRepository(p.Dbpool) + marketplaceById, err := marketplaceRepo.GetMarketplaceByID(ctx, payload.MarketplaceId) + if err != nil { + return asynq.SkipRetry + } + identifier, err := marketplaceById.GetIdentifier() + if err != nil { + return asynq.SkipRetry + } + lockKey := fmt.Sprintf("ozon:products:marketplace:%s:lock", identifier) + locker := *redis.Locker + _, cancel, err := locker.TryWithContext(ctx, lockKey) + if err != nil { + fmt.Printf("Failed to acquire lock for marketplace %s: %v\n", identifier, err) + return asynq.SkipRetry + } + defer cancel() + ozonRepo := products.NewAPIRepository(marketplaceRepo) + + productsRaw, err := ozonRepo.GetAllProducts(ctx, payload.MarketplaceId) + if err != nil { + return fmt.Errorf("failed to fetch products for marketplace %d: %w", payload.MarketplaceId, err) + } + converter := generated.ConverterImpl{} + productsProto := lo.Map(productsRaw, func(item products.OzonProduct, _ int) *products.PbProduct { + return converter.ToProto(&item) + }) + productsJson, err := json.Marshal(productsProto) + if err != nil { + return asynq.SkipRetry + } + redisClient := *redis.Client + productsKey := fmt.Sprintf("ozon:products:%s", identifier) + err = redisClient.Do(ctx, redisClient.B().Set().Key(productsKey).Value(string(productsJson)).Build()).Error() + if err != nil { + return err + } + return nil +} diff --git a/internal/tasks/server/server.go b/internal/tasks/server/server.go index 1cd3aec..9dc5835 100644 --- a/internal/tasks/server/server.go +++ b/internal/tasks/server/server.go @@ -4,6 +4,7 @@ import ( "github.com/hibiken/asynq" "github.com/jackc/pgx/v5/pgxpool" "sipro-mps/internal/config" + "sipro-mps/internal/tasks/ozon" "sipro-mps/internal/tasks/types" "sipro-mps/internal/tasks/wb" ) @@ -24,13 +25,14 @@ func (s *AsynqServer) createMux() *asynq.ServeMux { // Register task handlers here mux.Handle(types.TypeWbFetchProducts, &wb.FetchProductsProcessor{Dbpool: s.dbpool}) + mux.Handle(types.TypeOzonFetchProducts, &ozon.FetchProductsProcessor{Dbpool: s.dbpool}) return mux } func (s *AsynqServer) Run() { srv := asynq.NewServer( asynq.RedisClientOpt{Addr: s.redisConfig.Addr, Password: s.redisConfig.Password}, - asynq.Config{Concurrency: 10}, + asynq.Config{}, ) mux := s.createMux() if err := srv.Run(mux); err != nil { diff --git a/internal/tasks/types/common.go b/internal/tasks/types/common.go index b530688..5ea43cf 100644 --- a/internal/tasks/types/common.go +++ b/internal/tasks/types/common.go @@ -1,5 +1,6 @@ package types const ( - TypeWbFetchProducts = "wb:fetch_products" + TypeWbFetchProducts = "wb:fetch_products" + TypeOzonFetchProducts = "ozon:fetch_products" ) diff --git a/internal/tasks/types/types.go b/internal/tasks/types/types.go new file mode 100644 index 0000000..1ee8bdc --- /dev/null +++ b/internal/tasks/types/types.go @@ -0,0 +1,24 @@ +package types + +import ( + "encoding/json" + "github.com/hibiken/asynq" + "time" +) + +type FetchProductsTask struct { + MarketplaceId int +} + +func (t *FetchProductsTask) Unmarshal(task *asynq.Task) error { + return json.Unmarshal(task.Payload(), t) + +} + +func NewFetchProductsTask(taskType string, marketplaceId int) (*asynq.Task, error) { + payload, err := json.Marshal(&FetchProductsTask{MarketplaceId: marketplaceId}) + if err != nil { + return nil, err + } + return asynq.NewTask(taskType, payload, asynq.MaxRetry(2), asynq.Timeout(20*time.Minute)), nil +} diff --git a/internal/tasks/types/wb.go b/internal/tasks/types/wb.go deleted file mode 100644 index 00454a3..0000000 --- a/internal/tasks/types/wb.go +++ /dev/null @@ -1,19 +0,0 @@ -package types - -import ( - "encoding/json" - "github.com/hibiken/asynq" - "time" -) - -type FetchProductsTask struct { - MarketplaceId int -} - -func NewFetchProductsTask(marketplaceId int) (*asynq.Task, error) { - payload, err := json.Marshal(&FetchProductsTask{MarketplaceId: marketplaceId}) - if err != nil { - return nil, err - } - return asynq.NewTask(TypeWbFetchProducts, payload, asynq.MaxRetry(2), asynq.Timeout(20*time.Minute)), nil -} diff --git a/internal/tasks/wb/wb.go b/internal/tasks/wb/wb.go index f3f8f31..5a9d66b 100644 --- a/internal/tasks/wb/wb.go +++ b/internal/tasks/wb/wb.go @@ -8,16 +8,15 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/samber/lo" pb "sipro-mps/api/generated/v1/wb/products" - mp_repo "sipro-mps/internal/marketplace" + "sipro-mps/internal/marketplace" "sipro-mps/internal/redis" "sipro-mps/internal/tasks/types" - wb_products_repo "sipro-mps/internal/wb/products" - conv "sipro-mps/internal/wb/products/mapping/generated" + "sipro-mps/internal/wb/products" + "sipro-mps/internal/wb/products/mapping/generated" ) type FetchProductsProcessor struct { Dbpool *pgxpool.Pool - wbRepo wb_products_repo.Repository } func (p *FetchProductsProcessor) ProcessTask(ctx context.Context, task *asynq.Task) error { @@ -25,11 +24,15 @@ func (p *FetchProductsProcessor) ProcessTask(ctx context.Context, task *asynq.Ta if err := json.Unmarshal(task.Payload(), &payload); err != nil { return asynq.SkipRetry } - marketplaceRepo := mp_repo.NewDBRepository(p.Dbpool) - repo := wb_products_repo.NewAPIRepository(marketplaceRepo) - _, sellerId, err := repo.ParseMarketplace(ctx, payload.MarketplaceId) + marketplaceRepo := marketplace.NewDBRepository(p.Dbpool) + repo := products.NewAPIRepository(marketplaceRepo) + marketplaceById, err := marketplaceRepo.GetMarketplaceByID(ctx, payload.MarketplaceId) if err != nil { - return fmt.Errorf("failed to parse marketplace %d: %w", payload.MarketplaceId, err) + return fmt.Errorf("failed to get marketplace by ID %d: %w", payload.MarketplaceId, err) + } + sellerId, err := marketplaceById.GetIdentifier() + if err != nil { + return fmt.Errorf("failed to get identifier for marketplace %d: %w", payload.MarketplaceId, err) } locker := *redis.Locker _, cancel, err := locker.TryWithContext(ctx, fmt.Sprintf("wb:products:marketplace:%s:lock", sellerId)) @@ -47,12 +50,12 @@ func (p *FetchProductsProcessor) ProcessTask(ctx context.Context, task *asynq.Ta if err != nil { return fmt.Errorf("failed to fetch products for marketplace %d: %w", payload.MarketplaceId, err) } - converter := conv.ConverterImpl{} - products := lo.Map(productsRaw, func(item wb_products_repo.WbProduct, _ int) *pb.Product { + converter := generated.ConverterImpl{} + productsProto := lo.Map(productsRaw, func(item products.WbProduct, _ int) *pb.Product { return converter.ToProto(&item) }) redisClient := *redis.Client - productsJson, err := json.Marshal(products) + productsJson, err := json.Marshal(productsProto) if err != nil { return fmt.Errorf("failed to marshal products: %w", err) } diff --git a/internal/wb/common.go b/internal/wb/common.go index 6368852..6cc4363 100644 --- a/internal/wb/common.go +++ b/internal/wb/common.go @@ -1,43 +1,17 @@ package wb import ( - "encoding/json" "fmt" - "github.com/golang-jwt/jwt/v5" "net/http" "sipro-mps/internal/marketplace" wbclient "sipro-mps/pkg/api/wb/client" + "sipro-mps/pkg/utils" "time" ) -type WbAuthData struct { - Token string `json:"token"` -} - -func NewWbAuthData(token string) WbAuthData { - return WbAuthData{ - Token: token, - } -} - -func DecodeWildberriesJwt(token []byte) (WbAuthData, jwt.MapClaims, error) { - var authData WbAuthData - err := json.Unmarshal(token, &authData) - if err != nil { - return authData, nil, fmt.Errorf("failed to unmarshal JWT: %w", err) - } - claims := jwt.MapClaims{} - _, _, err = jwt.NewParser().ParseUnverified(authData.Token, claims) - if err != nil { - return authData, nil, fmt.Errorf("invalid JWT: %w", err) - } - - return authData, claims, nil -} - func GetClientFromMarketplace(mp *marketplace.Marketplace) (*wbclient.Client, error) { - authData, claims, err := DecodeWildberriesJwt(mp.AuthDataJson) + authData, claims, err := utils.DecodeWildberriesJwt(mp.AuthDataJson) if err != nil { return nil, fmt.Errorf("failed to decode Wildberries JWT") } diff --git a/internal/wb/products/repository_api.go b/internal/wb/products/repository_api.go index d37f62b..1b9d123 100644 --- a/internal/wb/products/repository_api.go +++ b/internal/wb/products/repository_api.go @@ -15,6 +15,7 @@ import ( "sipro-mps/internal/wb" "sipro-mps/internal/wb/products/mapping/generated" wbapi "sipro-mps/pkg/api/wb/client" + "sipro-mps/pkg/utils" ) const ( @@ -31,7 +32,7 @@ func (a apiRepository) ParseMarketplace(ctx context.Context, marketplaceId int) if err != nil { return nil, "", err } - _, claims, err := wb.DecodeWildberriesJwt(marketplaceByID.AuthDataJson) + _, claims, err := utils.DecodeWildberriesJwt(marketplaceByID.AuthDataJson) if err != nil { return nil, "", err } @@ -113,7 +114,7 @@ func (a apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId errChan <- fmt.Errorf("unmarshalling products from cache: %w", err) return } - task, err := types.NewFetchProductsTask(marketplaceId) + task, err := types.NewFetchProductsTask(types.TypeWbFetchProducts, marketplaceId) if err != nil { errChan <- fmt.Errorf("creating fetch products task: %w", err) return diff --git a/internal/wb/rate_limiter.go b/internal/wb/rate_limiter.go index 9c7aa27..4c469d1 100644 --- a/internal/wb/rate_limiter.go +++ b/internal/wb/rate_limiter.go @@ -7,6 +7,7 @@ import ( "github.com/redis/rueidis" "net/http" "sipro-mps/internal/redis" + "sipro-mps/pkg/utils" "time" ) @@ -68,12 +69,12 @@ func (t *RateLimitTransport) RoundTrip(req *http.Request) (*http.Response, error ctx := req.Context() tokenString := req.Header.Get("Authorization") - authData := NewWbAuthData(tokenString) + authData := utils.NewWbAuthData(tokenString) authDataBytes, err := json.Marshal(authData) if err != nil { return nil, fmt.Errorf("failed to marshal Wildberries auth data: %w", err) } - _, claims, err := DecodeWildberriesJwt(authDataBytes) + _, claims, err := utils.DecodeWildberriesJwt(authDataBytes) if err != nil { return nil, fmt.Errorf("failed to decode Wildberries JWT: %w", err) } diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go old mode 100755 new mode 100644 index d4b585b..f05baa8 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -1 +1,32 @@ package utils + +import ( + "encoding/json" + "fmt" + "github.com/golang-jwt/jwt/v5" +) + +type WbAuthData struct { + Token string `json:"token"` +} + +func NewWbAuthData(token string) WbAuthData { + return WbAuthData{ + Token: token, + } +} + +func DecodeWildberriesJwt(token []byte) (WbAuthData, jwt.MapClaims, error) { + var authData WbAuthData + err := json.Unmarshal(token, &authData) + if err != nil { + return authData, nil, fmt.Errorf("failed to unmarshal JWT: %w", err) + } + claims := jwt.MapClaims{} + _, _, err = jwt.NewParser().ParseUnverified(authData.Token, claims) + if err != nil { + return authData, nil, fmt.Errorf("invalid JWT: %w", err) + } + + return authData, claims, nil +} diff --git a/pkgs b/pkgs deleted file mode 100755 index 430fe4c..0000000 --- a/pkgs +++ /dev/null @@ -1,10 +0,0 @@ -go get github.com/gofiber/fiber/v2 -go get github.com/twmb/franz-go -go get github.com/redis/go-redis/v9 -go get google.golang.org/grpc -go get github.com/lib/pq -go get github.com/rs/zerolog/log -go get github.com/bytedance/sonic -go get github.com/gofiber/swagger -go get github.com/golang-jwt/jwt/v5 -go get github.com/carlmjohnson/requests \ No newline at end of file diff --git a/щдв b/щдв deleted file mode 100644 index d9da9fa..0000000 --- a/щдв +++ /dev/null @@ -1,225 +0,0 @@ -////// import ( -////// -////// "Sipro-Marketplaces/internal/marketplace" -////// ozon_products "Sipro-Marketplaces/internal/ozon/products" -////// "fmt" -////// "google.golang.org/grpc" -////// "net" -////// -////// ) -////// -////// func createGrpcServer() { -////// -////// lis, err := net.Listen("tcp", ":8080") -////// if err != nil { -////// fmt.Printf("failed to listen: %v\n", err) -////// return -////// } -////// grpcServer := grpc.NewServer() -////// -////// repo, err := marketplace.RegisterAdapterGRPC(grpcServer) -////// if err != nil { -////// fmt.Printf("failed to register gRPC server: %v\n", err) -////// return -////// } -////// _, err = ozon_products.RegisterAdapterGRPC(grpcServer, *repo) -////// if err != nil { -////// fmt.Printf("failed to register Ozon Products gRPC server: %v\n", err) -////// return -////// } -////// -////// fmt.Println("gRPC server registered successfully.") -////// // Start serving gRPC requests -////// fmt.Println("gRPC server is starting on port 8080...") -////// -////// if err := grpcServer.Serve(lis); err != nil { -////// fmt.Printf("failed to serve: %v\n", err) -////// } -////// fmt.Println("gRPC server created.") -////// } -////// -////// func main() { -////// fmt.Println("Starting Sipro-Marketplaces server...") -////// createGrpcServer() -////// } -////package main -//// -////import ( -//// "Sipro-Marketplaces/internal/marketplace" -//// "Sipro-Marketplaces/internal/ozon" -//// "context" -//// "encoding/json" -//// "fmt" -//// api "git.denco.store/fakz9/ozon-api-client/ozon" -//// "github.com/jackc/pgx/v5" -//// "time" -////) -//// -////func main() { -//// -//// startTime := time.Now() -//// -//// ctx := context.Background() -//// -//// conn, err := pgx.Connect(ctx, "postgresql://postgres:GjitkeYf%5Beq@/sipro?host=/run/postgresql") -//// if err != nil { -//// panic(err) -//// } -//// defer conn.Close(ctx) -//// mpRepo := marketplace.NewDBRepository(conn) -//// mp, err := mpRepo.GetMarketplaceByID(ctx, 262) -//// if err != nil { -//// panic(err) -//// } -//// client, err := ozon.GetClientFromMarketplace(mp) -//// if err != nil { -//// panic(err) -//// } -//// -//// productIdsChan := make(chan []int64) -//// -//// go func() { -//// defer close(productIdsChan) -//// -//// var lastId string -//// -//// for { -//// rsp, err := client.Products().GetListOfProducts( -//// ctx, -//// &api.GetListOfProductsParams{ -//// Filter: api.GetListOfProductsFilter{ -//// Visibility: "ALL", -//// }, -//// LastId: lastId, -//// Limit: 1000, -//// }, -//// ) -//// if err != nil { -//// // You may want to send error on another channel or handle differently -//// panic(err) -//// } -//// -//// items := rsp.Result.Items -//// if len(items) == 0 { -//// break -//// } -//// -//// productIds := make([]int64, len(items)) -//// for i, item := range items { -//// productIds[i] = item.ProductId -//// } -//// -//// productIdsChan <- productIds -//// -//// lastId = rsp.Result.LastId -//// if lastId == "" { -//// break -//// } -//// } -//// }() -//// -//// productsChan := make(chan []string) -//// go func() { -//// defer close(productsChan) -//// for productIds := range productIdsChan { -//// go func(ids []int64) { -//// products, err := client.Products().ListProductsByIDs( -//// ctx, &api.ListProductsByIDsParams{ -//// OfferId: nil, -//// ProductId: ids, -//// SKU: nil, -//// }) -//// if err != nil { -//// // Handle error appropriately, e.g., log it or send it to an error channel -//// panic(err) -//// } -//// productsEncoded := make([]string, len(products.Items)) -//// for i, product := range products.Items { -//// jsonString, err := json.Marshal(product) -//// if err != nil { -//// // Handle error appropriately, e.g., log it or send it to an error channel -//// panic(err) -//// } -//// productsEncoded[i] = string(jsonString) -//// } -//// productsChan <- productsEncoded -//// }(productIds) -//// } -//// -//// }() -//// total := 0 -//// for products := range productsChan { -//// for _ = range products { -//// // Here you can process each product, e.g., save to database or print -//// // For demonstration, we will just print the product -//// total += 1 -//// } -//// if len(products) == 0 { -//// fmt.Println("No more products to process.") -//// break -//// } -//// } -//// elapsedTime := time.Since(startTime) -//// println("Total time taken:", elapsedTime.String()) -//// println("Total products processed:", total) -//// -////} -// -//package main -// -//import ( -// "context" -// "fmt" -// "time" -// -// "github.com/redis/rueidis" -//) -// -//func main() { -// fmt.Println("Hello, World!") -// client, err := rueidis.NewClient(rueidis.ClientOption{ -// InitAddress: []string{"localhost:6379"}, -// Password: "ELdhsgqJt5QZUSWKU5vY3D9CXa1a0teIceeHqvCtoPkrDJ0Lge7XIe8187gFjd0qZLv9zwhGr62MqY", -// }) -// if err != nil { -// fmt.Printf("Failed to create Redis client: %v\n", err) -// return -// } -// defer client.Close() -// script := rueidis.NewLuaScript(` -// local key = KEYS[1] -// local now = tonumber(ARGV[1]) -// local window = tonumber(ARGV[2]) -// local limit = tonumber(ARGV[3]) -// -// redis.call('ZREMRANGEBYSCORE', key, '-inf', now - window) -// local count = redis.call('ZCARD', key) -// -// if count < limit then -// redis.call('ZADD', key, now, now) -// redis.call('EXPIRE', key, math.ceil(window / 1000000000)) -// return 1 -// end -// return 0 -// `) -// ctx := context.Background() -// windowSize := time.Second -// for i := 0; i < 51; i++ { -// now := time.Now().UnixNano() -// result, err := script.Exec(ctx, client, []string{"2282"}, []string{ -// fmt.Sprintf("%d", now), -// fmt.Sprintf("%d", int64(windowSize)), -// fmt.Sprintf("%d", 50), -// }).ToInt64() -// if err != nil { -// fmt.Printf("Failed to execute script: %v\n", err) -// return -// } -// if result == 1 { -// //fmt.Println("Request allowed") -// } else { -// fmt.Println("Rate limit exceeded") -// } -// } -// -//} \ No newline at end of file