Merge pull request #232 from galargh/pip-custom-registries

feat: allow python custom registries configuration
This commit is contained in:
Sander Declerck 2026-01-05 14:01:51 +01:00 committed by GitHub
commit 8bfbe1c77d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 579 additions and 284 deletions

View file

@ -1,3 +1,4 @@
import { getPipCustomRegistries } from "../../config/settings.js";
import { isMalwarePackage } from "../../scanning/audit/index.js";
import { interceptRequests } from "./interceptorBuilder.js";
@ -13,7 +14,9 @@ const knownPipRegistries = [
* @returns {import("./interceptorBuilder.js").Interceptor | undefined}
*/
export function pipInterceptorForUrl(url) {
const registry = knownPipRegistries.find((reg) => url.includes(reg));
const customRegistries = getPipCustomRegistries();
const registries = [...knownPipRegistries, ...customRegistries];
const registry = registries.find((reg) => url.includes(reg));
if (registry) {
return buildPipInterceptor(registry);
@ -37,8 +40,8 @@ function buildPipInterceptor(registry) {
// Per python, packages that differ only by hyphen vs underscore are considered the same.
const hyphenName = packageName?.includes("_") ? packageName.replace(/_/g, "-") : packageName;
const isMalicious =
await isMalwarePackage(packageName, version)
const isMalicious =
await isMalwarePackage(packageName, version)
|| await isMalwarePackage(hyphenName, version);
if (isMalicious) {

View file

@ -0,0 +1,199 @@
import { describe, it, mock } from "node:test";
import assert from "node:assert";
describe("pipInterceptor custom registries", async () => {
let lastPackage;
let malwareResponse = false;
let customRegistries = [];
mock.module("../../config/settings.js", {
namedExports: {
getPipCustomRegistries: () => customRegistries,
},
});
mock.module("../../scanning/audit/index.js", {
namedExports: {
isMalwarePackage: async (packageName, version) => {
lastPackage = { packageName, version };
return malwareResponse;
},
},
});
const { pipInterceptorForUrl } = await import("./pipInterceptor.js");
it("should create interceptor for custom registry", () => {
customRegistries = ["my-custom-registry.example.com"];
const url =
"https://my-custom-registry.example.com/packages/xx/yy/foobar-1.2.3.tar.gz";
const interceptor = pipInterceptorForUrl(url);
assert.ok(
interceptor,
"Interceptor should be created for custom registry"
);
});
it("should parse package from custom registry URL", async () => {
customRegistries = ["my-custom-registry.example.com"];
const url =
"https://my-custom-registry.example.com/packages/xx/yy/foobar-1.2.3.tar.gz";
const interceptor = pipInterceptorForUrl(url);
assert.ok(interceptor, "Interceptor should be created");
await interceptor.handleRequest(url);
assert.deepEqual(lastPackage, {
packageName: "foobar",
version: "1.2.3",
});
});
it("should parse wheel package from custom registry URL", async () => {
customRegistries = ["private-pypi.internal.com"];
const url =
"https://private-pypi.internal.com/packages/foo_bar-2.0.0-py3-none-any.whl";
const interceptor = pipInterceptorForUrl(url);
assert.ok(interceptor, "Interceptor should be created");
await interceptor.handleRequest(url);
assert.deepEqual(lastPackage, {
packageName: "foo-bar",
version: "2.0.0",
});
});
it("should handle multiple custom registries", async () => {
customRegistries = [
"registry-one.example.com",
"registry-two.example.com",
];
const url1 =
"https://registry-one.example.com/packages/package1-1.0.0.tar.gz";
const url2 =
"https://registry-two.example.com/packages/package2-2.0.0.tar.gz";
const interceptor1 = pipInterceptorForUrl(url1);
const interceptor2 = pipInterceptorForUrl(url2);
assert.ok(interceptor1, "Interceptor should be created for first registry");
assert.ok(
interceptor2,
"Interceptor should be created for second registry"
);
});
it("should block malicious package from custom registry", async () => {
customRegistries = ["my-custom-registry.example.com"];
malwareResponse = true;
const url =
"https://my-custom-registry.example.com/packages/malicious_package-1.0.0.tar.gz";
const interceptor = pipInterceptorForUrl(url);
assert.ok(interceptor, "Interceptor should be created");
const result = await interceptor.handleRequest(url);
assert.ok(result.blockResponse, "Should contain a blockResponse");
assert.equal(
result.blockResponse.statusCode,
403,
"Block response should have status code 403"
);
assert.equal(
result.blockResponse.message,
"Forbidden - blocked by safe-chain",
"Block response should have correct status message"
);
malwareResponse = false;
});
it("should still work with known registries when custom registries are set", async () => {
customRegistries = ["my-custom-registry.example.com"];
const url =
"https://files.pythonhosted.org/packages/xx/yy/foobar-1.2.3.tar.gz";
const interceptor = pipInterceptorForUrl(url);
assert.ok(
interceptor,
"Interceptor should be created for known registry even with custom registries set"
);
await interceptor.handleRequest(url);
assert.deepEqual(lastPackage, {
packageName: "foobar",
version: "1.2.3",
});
});
it("should not create interceptor for unknown registry when custom registries are set", () => {
customRegistries = ["my-custom-registry.example.com"];
const url = "https://unknown-registry.example.com/packages/foobar-1.0.0.tar.gz";
const interceptor = pipInterceptorForUrl(url);
assert.equal(
interceptor,
undefined,
"Interceptor should be undefined for unknown registry"
);
});
it("should handle empty custom registries array", () => {
customRegistries = [];
const url =
"https://my-custom-registry.example.com/packages/foobar-1.0.0.tar.gz";
const interceptor = pipInterceptorForUrl(url);
assert.equal(
interceptor,
undefined,
"Interceptor should be undefined when no custom registries are configured"
);
});
it("should parse .whl.metadata from custom registry", async () => {
customRegistries = ["private-pypi.internal.com"];
const url =
"https://private-pypi.internal.com/packages/foo_bar-2.0.0-py3-none-any.whl.metadata";
const interceptor = pipInterceptorForUrl(url);
assert.ok(interceptor, "Interceptor should be created");
await interceptor.handleRequest(url);
assert.deepEqual(lastPackage, {
packageName: "foo-bar",
version: "2.0.0",
});
});
it("should parse .tar.gz.metadata from custom registry", async () => {
customRegistries = ["private-pypi.internal.com"];
const url =
"https://private-pypi.internal.com/packages/foo_bar-2.0.0.tar.gz.metadata";
const interceptor = pipInterceptorForUrl(url);
assert.ok(interceptor, "Interceptor should be created");
await interceptor.handleRequest(url);
assert.deepEqual(lastPackage, {
packageName: "foo-bar",
version: "2.0.0",
});
});
});