feat: allow python custom registries configuration

This commit is contained in:
galargh 2025-12-10 13:27:18 +01:00
parent 9444c7b4f6
commit 833fa285aa
5 changed files with 259 additions and 3 deletions

View file

@ -179,6 +179,25 @@ You can set the minimum package age through multiple sources (in order of priori
} }
``` ```
## Custom Registries
By default, Safe Chain monitors downloads from the official package registries (npm registry, PyPI, etc.). If you use a private or custom package registry, you can configure Safe Chain to also monitor downloads from those registries.
⚠️ This feature **currently only applies to Python package managers** (pip, pip3, uv, poetry) and does not apply to npm-based package managers.
### Configuration Options
You can set custom registries through the following source:
1. **Environment Variable**:
```shell
export SAFE_CHAIN_PIP_CUSTOM_REGISTRIES=my-custom-registry.example.com,private-pypi.internal.com
pip install mypackage
```
Use a comma-separated list of registry hostnames to monitor multiple custom registries.
# Usage in CI/CD # Usage in CI/CD
You can protect your CI/CD pipelines from malicious packages by integrating Aikido Safe Chain into your build process. This ensures that any packages installed during your automated builds are checked for malware before installation. You can protect your CI/CD pipelines from malicious packages by integrating Aikido Safe Chain into your build process. This ensures that any packages installed during your automated builds are checked for malware before installation.

View file

@ -5,3 +5,11 @@
export function getMinimumPackageAgeHours() { export function getMinimumPackageAgeHours() {
return process.env.SAFE_CHAIN_MINIMUM_PACKAGE_AGE_HOURS; return process.env.SAFE_CHAIN_MINIMUM_PACKAGE_AGE_HOURS;
} }
/**
* Gets the custom pip registries from environment variable
* @returns {string | undefined}
*/
export function getPipCustomRegistries() {
return process.env.SAFE_CHAIN_PIP_CUSTOM_REGISTRIES;
}

View file

@ -98,3 +98,30 @@ export function skipMinimumPackageAge() {
return defaultSkipMinimumPackageAge; return defaultSkipMinimumPackageAge;
} }
/** @type {string[]} */
const defaultPipCustomRegistries = [];
/** @returns {string[]} */
export function getPipCustomRegistries() {
// Priority 1: Environment variable
const envValue = validatePipCustomRegistries(
environmentVariables.getPipCustomRegistries()
);
if (envValue !== undefined) {
return envValue;
}
return defaultPipCustomRegistries;
}
/**
* @param {string | undefined} value
* @returns {string[] | undefined}
*/
function validatePipCustomRegistries(value) {
if (!value) {
return undefined;
}
return value.split(",");
}

View file

@ -1,3 +1,4 @@
import { getPipCustomRegistries } from "../../config/settings.js";
import { isMalwarePackage } from "../../scanning/audit/index.js"; import { isMalwarePackage } from "../../scanning/audit/index.js";
import { interceptRequests } from "./interceptorBuilder.js"; import { interceptRequests } from "./interceptorBuilder.js";
@ -13,7 +14,9 @@ const knownPipRegistries = [
* @returns {import("./interceptorBuilder.js").Interceptor | undefined} * @returns {import("./interceptorBuilder.js").Interceptor | undefined}
*/ */
export function pipInterceptorForUrl(url) { export function pipInterceptorForUrl(url) {
const registry = knownPipRegistries.find((reg) => url.includes(reg)); const customRegistries = getPipCustomRegistries();
const registries = [...knownPipRegistries, ...customRegistries];
const registry = registries.find((reg) => url.includes(reg));
if (registry) { if (registry) {
return buildPipInterceptor(registry); return buildPipInterceptor(registry);
@ -37,8 +40,8 @@ function buildPipInterceptor(registry) {
// Per python, packages that differ only by hyphen vs underscore are considered the same. // Per python, packages that differ only by hyphen vs underscore are considered the same.
const hyphenName = packageName?.includes("_") ? packageName.replace(/_/g, "-") : packageName; const hyphenName = packageName?.includes("_") ? packageName.replace(/_/g, "-") : packageName;
const isMalicious = const isMalicious =
await isMalwarePackage(packageName, version) await isMalwarePackage(packageName, version)
|| await isMalwarePackage(hyphenName, version); || await isMalwarePackage(hyphenName, version);
if (isMalicious) { if (isMalicious) {

View file

@ -0,0 +1,199 @@
import { describe, it, mock } from "node:test";
import assert from "node:assert";
describe("pipInterceptor custom registries", async () => {
let lastPackage;
let malwareResponse = false;
let customRegistries = [];
mock.module("../../config/settings.js", {
namedExports: {
getPipCustomRegistries: () => customRegistries,
},
});
mock.module("../../scanning/audit/index.js", {
namedExports: {
isMalwarePackage: async (packageName, version) => {
lastPackage = { packageName, version };
return malwareResponse;
},
},
});
const { pipInterceptorForUrl } = await import("./pipInterceptor.js");
it("should create interceptor for custom registry", () => {
customRegistries = ["my-custom-registry.example.com"];
const url =
"https://my-custom-registry.example.com/packages/xx/yy/foobar-1.2.3.tar.gz";
const interceptor = pipInterceptorForUrl(url);
assert.ok(
interceptor,
"Interceptor should be created for custom registry"
);
});
it("should parse package from custom registry URL", async () => {
customRegistries = ["my-custom-registry.example.com"];
const url =
"https://my-custom-registry.example.com/packages/xx/yy/foobar-1.2.3.tar.gz";
const interceptor = pipInterceptorForUrl(url);
assert.ok(interceptor, "Interceptor should be created");
await interceptor.handleRequest(url);
assert.deepEqual(lastPackage, {
packageName: "foobar",
version: "1.2.3",
});
});
it("should parse wheel package from custom registry URL", async () => {
customRegistries = ["private-pypi.internal.com"];
const url =
"https://private-pypi.internal.com/packages/foo_bar-2.0.0-py3-none-any.whl";
const interceptor = pipInterceptorForUrl(url);
assert.ok(interceptor, "Interceptor should be created");
await interceptor.handleRequest(url);
assert.deepEqual(lastPackage, {
packageName: "foo-bar",
version: "2.0.0",
});
});
it("should handle multiple custom registries", async () => {
customRegistries = [
"registry-one.example.com",
"registry-two.example.com",
];
const url1 =
"https://registry-one.example.com/packages/package1-1.0.0.tar.gz";
const url2 =
"https://registry-two.example.com/packages/package2-2.0.0.tar.gz";
const interceptor1 = pipInterceptorForUrl(url1);
const interceptor2 = pipInterceptorForUrl(url2);
assert.ok(interceptor1, "Interceptor should be created for first registry");
assert.ok(
interceptor2,
"Interceptor should be created for second registry"
);
});
it("should block malicious package from custom registry", async () => {
customRegistries = ["my-custom-registry.example.com"];
malwareResponse = true;
const url =
"https://my-custom-registry.example.com/packages/malicious_package-1.0.0.tar.gz";
const interceptor = pipInterceptorForUrl(url);
assert.ok(interceptor, "Interceptor should be created");
const result = await interceptor.handleRequest(url);
assert.ok(result.blockResponse, "Should contain a blockResponse");
assert.equal(
result.blockResponse.statusCode,
403,
"Block response should have status code 403"
);
assert.equal(
result.blockResponse.message,
"Forbidden - blocked by safe-chain",
"Block response should have correct status message"
);
malwareResponse = false;
});
it("should still work with known registries when custom registries are set", async () => {
customRegistries = ["my-custom-registry.example.com"];
const url =
"https://files.pythonhosted.org/packages/xx/yy/foobar-1.2.3.tar.gz";
const interceptor = pipInterceptorForUrl(url);
assert.ok(
interceptor,
"Interceptor should be created for known registry even with custom registries set"
);
await interceptor.handleRequest(url);
assert.deepEqual(lastPackage, {
packageName: "foobar",
version: "1.2.3",
});
});
it("should not create interceptor for unknown registry when custom registries are set", () => {
customRegistries = ["my-custom-registry.example.com"];
const url = "https://unknown-registry.example.com/packages/foobar-1.0.0.tar.gz";
const interceptor = pipInterceptorForUrl(url);
assert.equal(
interceptor,
undefined,
"Interceptor should be undefined for unknown registry"
);
});
it("should handle empty custom registries array", () => {
customRegistries = [];
const url =
"https://my-custom-registry.example.com/packages/foobar-1.0.0.tar.gz";
const interceptor = pipInterceptorForUrl(url);
assert.equal(
interceptor,
undefined,
"Interceptor should be undefined when no custom registries are configured"
);
});
it("should parse .whl.metadata from custom registry", async () => {
customRegistries = ["private-pypi.internal.com"];
const url =
"https://private-pypi.internal.com/packages/foo_bar-2.0.0-py3-none-any.whl.metadata";
const interceptor = pipInterceptorForUrl(url);
assert.ok(interceptor, "Interceptor should be created");
await interceptor.handleRequest(url);
assert.deepEqual(lastPackage, {
packageName: "foo-bar",
version: "2.0.0",
});
});
it("should parse .tar.gz.metadata from custom registry", async () => {
customRegistries = ["private-pypi.internal.com"];
const url =
"https://private-pypi.internal.com/packages/foo_bar-2.0.0.tar.gz.metadata";
const interceptor = pipInterceptorForUrl(url);
assert.ok(interceptor, "Interceptor should be created");
await interceptor.handleRequest(url);
assert.deepEqual(lastPackage, {
packageName: "foo-bar",
version: "2.0.0",
});
});
});