mirror of
https://github.com/AikidoSec/safe-chain.git
synced 2026-05-26 12:10:49 +00:00
199 lines
5.7 KiB
JavaScript
199 lines
5.7 KiB
JavaScript
import { describe, it, mock } from "node:test";
|
|
import assert from "node:assert";
|
|
|
|
describe("pipInterceptor custom registries", async () => {
|
|
let lastPackage;
|
|
let malwareResponse = false;
|
|
let customRegistries = [];
|
|
|
|
mock.module("../../config/settings.js", {
|
|
namedExports: {
|
|
getPipCustomRegistries: () => customRegistries,
|
|
},
|
|
});
|
|
|
|
mock.module("../../scanning/audit/index.js", {
|
|
namedExports: {
|
|
isMalwarePackage: async (packageName, version) => {
|
|
lastPackage = { packageName, version };
|
|
return malwareResponse;
|
|
},
|
|
},
|
|
});
|
|
|
|
const { pipInterceptorForUrl } = await import("./pipInterceptor.js");
|
|
|
|
it("should create interceptor for custom registry", () => {
|
|
customRegistries = ["my-custom-registry.example.com"];
|
|
const url =
|
|
"https://my-custom-registry.example.com/packages/xx/yy/foobar-1.2.3.tar.gz";
|
|
|
|
const interceptor = pipInterceptorForUrl(url);
|
|
|
|
assert.ok(
|
|
interceptor,
|
|
"Interceptor should be created for custom registry"
|
|
);
|
|
});
|
|
|
|
it("should parse package from custom registry URL", async () => {
|
|
customRegistries = ["my-custom-registry.example.com"];
|
|
const url =
|
|
"https://my-custom-registry.example.com/packages/xx/yy/foobar-1.2.3.tar.gz";
|
|
|
|
const interceptor = pipInterceptorForUrl(url);
|
|
assert.ok(interceptor, "Interceptor should be created");
|
|
|
|
await interceptor.handleRequest(url);
|
|
|
|
assert.deepEqual(lastPackage, {
|
|
packageName: "foobar",
|
|
version: "1.2.3",
|
|
});
|
|
});
|
|
|
|
it("should parse wheel package from custom registry URL", async () => {
|
|
customRegistries = ["private-pypi.internal.com"];
|
|
const url =
|
|
"https://private-pypi.internal.com/packages/foo_bar-2.0.0-py3-none-any.whl";
|
|
|
|
const interceptor = pipInterceptorForUrl(url);
|
|
assert.ok(interceptor, "Interceptor should be created");
|
|
|
|
await interceptor.handleRequest(url);
|
|
|
|
assert.deepEqual(lastPackage, {
|
|
packageName: "foo-bar",
|
|
version: "2.0.0",
|
|
});
|
|
});
|
|
|
|
it("should handle multiple custom registries", async () => {
|
|
customRegistries = [
|
|
"registry-one.example.com",
|
|
"registry-two.example.com",
|
|
];
|
|
|
|
const url1 =
|
|
"https://registry-one.example.com/packages/package1-1.0.0.tar.gz";
|
|
const url2 =
|
|
"https://registry-two.example.com/packages/package2-2.0.0.tar.gz";
|
|
|
|
const interceptor1 = pipInterceptorForUrl(url1);
|
|
const interceptor2 = pipInterceptorForUrl(url2);
|
|
|
|
assert.ok(interceptor1, "Interceptor should be created for first registry");
|
|
assert.ok(
|
|
interceptor2,
|
|
"Interceptor should be created for second registry"
|
|
);
|
|
});
|
|
|
|
it("should block malicious package from custom registry", async () => {
|
|
customRegistries = ["my-custom-registry.example.com"];
|
|
malwareResponse = true;
|
|
|
|
const url =
|
|
"https://my-custom-registry.example.com/packages/malicious_package-1.0.0.tar.gz";
|
|
|
|
const interceptor = pipInterceptorForUrl(url);
|
|
assert.ok(interceptor, "Interceptor should be created");
|
|
|
|
const result = await interceptor.handleRequest(url);
|
|
|
|
assert.ok(result.blockResponse, "Should contain a blockResponse");
|
|
assert.equal(
|
|
result.blockResponse.statusCode,
|
|
403,
|
|
"Block response should have status code 403"
|
|
);
|
|
assert.equal(
|
|
result.blockResponse.message,
|
|
"Forbidden - blocked by safe-chain",
|
|
"Block response should have correct status message"
|
|
);
|
|
|
|
malwareResponse = false;
|
|
});
|
|
|
|
it("should still work with known registries when custom registries are set", async () => {
|
|
customRegistries = ["my-custom-registry.example.com"];
|
|
|
|
const url =
|
|
"https://files.pythonhosted.org/packages/xx/yy/foobar-1.2.3.tar.gz";
|
|
|
|
const interceptor = pipInterceptorForUrl(url);
|
|
|
|
assert.ok(
|
|
interceptor,
|
|
"Interceptor should be created for known registry even with custom registries set"
|
|
);
|
|
|
|
await interceptor.handleRequest(url);
|
|
|
|
assert.deepEqual(lastPackage, {
|
|
packageName: "foobar",
|
|
version: "1.2.3",
|
|
});
|
|
});
|
|
|
|
it("should not create interceptor for unknown registry when custom registries are set", () => {
|
|
customRegistries = ["my-custom-registry.example.com"];
|
|
const url = "https://unknown-registry.example.com/packages/foobar-1.0.0.tar.gz";
|
|
|
|
const interceptor = pipInterceptorForUrl(url);
|
|
|
|
assert.equal(
|
|
interceptor,
|
|
undefined,
|
|
"Interceptor should be undefined for unknown registry"
|
|
);
|
|
});
|
|
|
|
it("should handle empty custom registries array", () => {
|
|
customRegistries = [];
|
|
const url =
|
|
"https://my-custom-registry.example.com/packages/foobar-1.0.0.tar.gz";
|
|
|
|
const interceptor = pipInterceptorForUrl(url);
|
|
|
|
assert.equal(
|
|
interceptor,
|
|
undefined,
|
|
"Interceptor should be undefined when no custom registries are configured"
|
|
);
|
|
});
|
|
|
|
it("should parse .whl.metadata from custom registry", async () => {
|
|
customRegistries = ["private-pypi.internal.com"];
|
|
const url =
|
|
"https://private-pypi.internal.com/packages/foo_bar-2.0.0-py3-none-any.whl.metadata";
|
|
|
|
const interceptor = pipInterceptorForUrl(url);
|
|
assert.ok(interceptor, "Interceptor should be created");
|
|
|
|
await interceptor.handleRequest(url);
|
|
|
|
assert.deepEqual(lastPackage, {
|
|
packageName: "foo-bar",
|
|
version: "2.0.0",
|
|
});
|
|
});
|
|
|
|
it("should parse .tar.gz.metadata from custom registry", async () => {
|
|
customRegistries = ["private-pypi.internal.com"];
|
|
const url =
|
|
"https://private-pypi.internal.com/packages/foo_bar-2.0.0.tar.gz.metadata";
|
|
|
|
const interceptor = pipInterceptorForUrl(url);
|
|
assert.ok(interceptor, "Interceptor should be created");
|
|
|
|
await interceptor.handleRequest(url);
|
|
|
|
assert.deepEqual(lastPackage, {
|
|
packageName: "foo-bar",
|
|
version: "2.0.0",
|
|
});
|
|
});
|
|
});
|
|
|