From 219e7d3942c0b15cf66fe25f50b3fa01eb7a9bea Mon Sep 17 00:00:00 2001 From: Peter Lehnhardt Date: Thu, 30 Oct 2025 11:39:47 +0100 Subject: [PATCH] Implement Config APIs This change implements `describeConfigs`, `alterConfigs`, and `incrementalAlterConfigs` on the admin client. on-behalf-of: @SAP ospo@sap.com --- docs/admin.md | 40 + docs/diagnostic.md | 41 +- playground/apis/admin/config.ts | 4 +- src/apis/admin/alter-configs-v2.ts | 7 +- src/apis/admin/describe-configs-v4.ts | 37 +- .../admin/incremental-alter-configs-v1.ts | 30 +- src/apis/enumerations.ts | 33 +- src/clients/admin/admin.ts | 335 ++- src/clients/admin/options.ts | 128 +- src/clients/admin/types.ts | 30 +- src/diagnostic.ts | 1 + test/apis/admin/alter-configs-v2.test.ts | 176 +- test/apis/admin/describe-configs-v2.test.ts | 64 +- test/apis/admin/describe-configs-v3.test.ts | 64 +- test/apis/admin/describe-configs-v4.test.ts | 101 +- .../incremental-alter-configs-v1.test.ts | 221 +- test/clients/admin/admin.test.ts | 2069 ++++++++++++++++- 17 files changed, 3033 insertions(+), 348 deletions(-) diff --git a/docs/admin.md b/docs/admin.md index aa6e325..176f91f 100644 --- a/docs/admin.md +++ b/docs/admin.md @@ -128,6 +128,46 @@ Options: | -------- | ------------------------------- | -------------------------------------------------------------------------------- | | topics | `DescribeLogDirsRequestTopic[]` | Array of topics specifying the topics and partitions for which to describe logs. | +### `describeConfigs(options[, callback])` + +Describes configuration parameters for specified resources. + +The return value is an array of resource configurations, each containing the resource type, name, and configuration entries. + +Options: + +| Property | Type | Description | +| -------------------- | ---------------------------------- | ------------------------------------------------------------------------------------------ | +| resources | `DescribeConfigsRequestResource[]` | Array of resources specifying the resource type, name, and configuration keys to describe. | +| includeSynonyms | `boolean` | Whether to include configuration synonyms in the response. Defaults to `false`. | +| includeDocumentation | `boolean` | Whether to include configuration documentation in the response. Defaults to `false`. | + +### `alterConfigs(options[, callback])` + +Alters configuration parameters for specified resources. + +The return value is `void`. + +Options: + +| Property | Type | Description | +| ------------ | ------------------------------- | ------------------------------------------------------------------------------------ | +| resources | `AlterConfigsRequestResource[]` | Array of resources specifying the resource type, name, and configurations to change. | +| validateOnly | `boolean` | Whether to only validate the request without applying changes. Defaults to `false`. | + +### `incrementalAlterConfigs(options[, callback])` + +Incrementally alters configuration parameters for specified resources using specific operations (Set, delete, append, subract). + +The return value is `void`. + +Options: + +| Property | Type | Description | +| ------------ | ------------------------------------------ | -------------------------------------------------------------------------------------- | +| resources | `IncrementalAlterConfigsRequestResource[]` | Array of resources specifying the resource type, name, and incremental configurations. | +| validateOnly | `boolean` | Whether to only validate the request without applying changes. Defaults to `false`. | + ### `close([callback])` Closes the admin and all its connections. diff --git a/docs/diagnostic.md b/docs/diagnostic.md index 3f16782..d65cfd2 100644 --- a/docs/diagnostic.md +++ b/docs/diagnostic.md @@ -62,23 +62,24 @@ Each tracing channel publishes events with the following common properties: ## Published tracing channels -| Name | Target | Description | -| ----------------------------------- | ---------------- | ------------------------------------------------------------------------------------------------- | -| `plt:kafka:connections:connects` | `Connection` | Traces a connection attempt to a broker. | -| `plt:kafka:connections:api` | `Connection` | Traces a low level API request. | -| `plt:kafka:connections:pools:gets` | `ConnectionPool` | Traces a connection retrieval attempt from a connection pool. | -| `plt:kafka:base:apis` | `Base` | Traces a `Base.listApis` request. | -| `plt:kafka:base:metadata` | `Base` | Traces a `Base.metadata` request. | -| `plt:kafka:admin:topics` | `Admin` | Traces a `Admin.createTopics` or `Admin.deleteTopics` request. | -| `plt:kafka:admin:groups` | `Admin` | Traces a `Admin.listGroups`, `Admin.describeGroups` or `Admin.deleteGroups` request. | -| `plt:kafka:admin:clientQuotas` | `Admin` | Traces a `Admin.describeClientQuotas` or `Admin.alterClientQuotas` request. | -| `plt:kafka:admin:logDirs` | `Admin` | Traces a `Admin.describeLogDirs` request. | -| `plt:kafka:producer:initIdempotent` | `Producer` | Traces a `Producer.initIdempotentProducer` request. | -| `plt:kafka:producer:sends` | `Producer` | Traces a `Producer.send` request. | -| `plt:kafka:consumer:group` | `Consumer` | Traces a `Consumer.findGroupCoordinator`, `Consumer.joinGroup` or `Consumer.leaveGroup` requests. | -| `plt:kafka:consumer:heartbeat` | `Consumer` | Traces the `Consumer` heartbeat requests. | -| `plt:kafka:consumer:receives` | `Consumer` | Traces processing of every message. | -| `plt:kafka:consumer:fetches` | `Consumer` | Traces a `Consumer.fetch` request. | -| `plt:kafka:consumer:consumes` | `Consumer` | Traces a `Consumer.consume` request. | -| `plt:kafka:consumer:commits` | `Consumer` | Traces a `Consumer.commit` request. | -| `plt:kafka:consumer:offsets` | `Consumer` | Traces a `Consumer.listOffsets` or `Consumer.listCommittedOffsets` request. | +| Name | Target | Description | +| ----------------------------------- | ---------------- | --------------------------------------------------------------------------------------------------- | +| `plt:kafka:connections:connects` | `Connection` | Traces a connection attempt to a broker. | +| `plt:kafka:connections:api` | `Connection` | Traces a low level API request. | +| `plt:kafka:connections:pools:gets` | `ConnectionPool` | Traces a connection retrieval attempt from a connection pool. | +| `plt:kafka:base:apis` | `Base` | Traces a `Base.listApis` request. | +| `plt:kafka:base:metadata` | `Base` | Traces a `Base.metadata` request. | +| `plt:kafka:admin:topics` | `Admin` | Traces a `Admin.createTopics` or `Admin.deleteTopics` request. | +| `plt:kafka:admin:groups` | `Admin` | Traces a `Admin.listGroups`, `Admin.describeGroups` or `Admin.deleteGroups` request. | +| `plt:kafka:admin:clientQuotas` | `Admin` | Traces a `Admin.describeClientQuotas` or `Admin.alterClientQuotas` request. | +| `plt:kafka:admin:logDirs` | `Admin` | Traces a `Admin.describeLogDirs` request. | +| `plt:kafka:admin:configs` | `Admin` | Traces a `Admin.describeConfigs`, `Admin.alterConfigs`, or `Admin.incrementalAlterConfigs` request. | +| `plt:kafka:producer:initIdempotent` | `Producer` | Traces a `Producer.initIdempotentProducer` request. | +| `plt:kafka:producer:sends` | `Producer` | Traces a `Producer.send` request. | +| `plt:kafka:consumer:group` | `Consumer` | Traces a `Consumer.findGroupCoordinator`, `Consumer.joinGroup` or `Consumer.leaveGroup` requests. | +| `plt:kafka:consumer:heartbeat` | `Consumer` | Traces the `Consumer` heartbeat requests. | +| `plt:kafka:consumer:receives` | `Consumer` | Traces processing of every message. | +| `plt:kafka:consumer:fetches` | `Consumer` | Traces a `Consumer.fetch` request. | +| `plt:kafka:consumer:consumes` | `Consumer` | Traces a `Consumer.consume` request. | +| `plt:kafka:consumer:commits` | `Consumer` | Traces a `Consumer.commit` request. | +| `plt:kafka:consumer:offsets` | `Consumer` | Traces a `Consumer.listOffsets` or `Consumer.listCommittedOffsets` request. | diff --git a/playground/apis/admin/config.ts b/playground/apis/admin/config.ts index 919b69c..ed20406 100644 --- a/playground/apis/admin/config.ts +++ b/playground/apis/admin/config.ts @@ -1,7 +1,7 @@ import { api as alterConfigsV2 } from '../../../src/apis/admin/alter-configs-v2.ts' import { api as describeConfigsV4 } from '../../../src/apis/admin/describe-configs-v4.ts' import { api as incrementalAlterConfigsV1 } from '../../../src/apis/admin/incremental-alter-configs-v1.ts' -import { IncrementalAlterConfigTypes, ResourceTypes } from '../../../src/apis/enumerations.ts' +import { IncrementalAlterConfigOperationTypes, ResourceTypes } from '../../../src/apis/enumerations.ts' import { Connection } from '../../../src/network/connection.ts' import { performAPICallWithRetry } from '../../utils.ts' @@ -50,7 +50,7 @@ await performAPICallWithRetry('IncrementalAlterConfig', () => configs: [ { name: 'compression.type', - configOperation: IncrementalAlterConfigTypes.SET, + configOperation: IncrementalAlterConfigOperationTypes.SET, value: 'gzip' } ] diff --git a/src/apis/admin/alter-configs-v2.ts b/src/apis/admin/alter-configs-v2.ts index 66a77ed..c088605 100644 --- a/src/apis/admin/alter-configs-v2.ts +++ b/src/apis/admin/alter-configs-v2.ts @@ -3,6 +3,7 @@ import { type NullableString } from '../../protocol/definitions.ts' import { type Reader } from '../../protocol/reader.ts' import { Writer } from '../../protocol/writer.ts' import { createAPI, type ResponseErrorWithLocation } from '../definitions.ts' +import { type ConfigResource } from '../enumerations.ts' export interface AlterConfigsRequestConfig { name: string @@ -10,7 +11,7 @@ export interface AlterConfigsRequestConfig { } export interface AlterConfigsRequestResource { - resourceType: number + resourceType: ConfigResource resourceName: string configs: AlterConfigsRequestConfig[] } @@ -20,7 +21,7 @@ export type AlterConfigsRequest = Parameters export interface AlterConfigsResponseResult { errorCode: number errorMessage: NullableString - resourceType: number + resourceType: ConfigResource resourceName: string } @@ -81,7 +82,7 @@ export function parseResponse ( return { errorCode, errorMessage: r.readNullableString(), - resourceType: r.readInt8(), + resourceType: r.readInt8() as ConfigResource, resourceName: r.readString() } }) diff --git a/src/apis/admin/describe-configs-v4.ts b/src/apis/admin/describe-configs-v4.ts index 9c56d76..5c05b67 100644 --- a/src/apis/admin/describe-configs-v4.ts +++ b/src/apis/admin/describe-configs-v4.ts @@ -3,11 +3,12 @@ import { type NullableString } from '../../protocol/definitions.ts' import { type Reader } from '../../protocol/reader.ts' import { Writer } from '../../protocol/writer.ts' import { createAPI, type ResponseErrorWithLocation } from '../definitions.ts' +import { type ConfigSource, type ConfigResource, type ConfigType } from '../enumerations.ts' export interface DescribeConfigsRequestResource { - resourceType: number + resourceType: ConfigResource resourceName: string - configurationKeys: string[] + configurationKeys?: string[] | null | undefined } export type DescribeConfigsRequest = Parameters @@ -15,24 +16,24 @@ export type DescribeConfigsRequest = Parameters export interface DescribeConfigsResponseSynonym { name: string value: NullableString - source: number + source: ConfigSource } export interface DescribeConfigsResponseConfig { name: string value: NullableString readOnly: boolean - configSource: number + configSource: ConfigSource isSensitive: boolean synonyms: DescribeConfigsResponseSynonym[] - configType: number + configType: ConfigType documentation: NullableString } export interface DescribeConfigsResponseResult { errorCode: number errorMessage: NullableString - resourceType: number + resourceType: ConfigResource resourceName: string configs: DescribeConfigsResponseConfig[] } @@ -108,27 +109,23 @@ export function parseResponse ( return { errorCode, errorMessage: r.readNullableString(), - resourceType: r.readInt8(), + resourceType: r.readInt8() as ConfigResource, resourceName: r.readString(), configs: r.readArray(r => { return { name: r.readString(), value: r.readNullableString(), readOnly: r.readBoolean(), - configSource: r.readInt8(), + configSource: r.readInt8() as ConfigSource, isSensitive: r.readBoolean(), - synonyms: r.readArray( - r => { - return { - name: r.readString(), - value: r.readNullableString(), - source: r.readInt8() - } - }, - true, - false - ), - configType: r.readInt8(), + synonyms: r.readArray(r => { + return { + name: r.readString(), + value: r.readNullableString(), + source: r.readInt8() as ConfigSource + } + }), + configType: r.readInt8() as ConfigType, documentation: r.readNullableString() } }) diff --git a/src/apis/admin/incremental-alter-configs-v1.ts b/src/apis/admin/incremental-alter-configs-v1.ts index cd34044..ef199eb 100644 --- a/src/apis/admin/incremental-alter-configs-v1.ts +++ b/src/apis/admin/incremental-alter-configs-v1.ts @@ -3,15 +3,29 @@ import { type NullableString } from '../../protocol/definitions.ts' import { type Reader } from '../../protocol/reader.ts' import { Writer } from '../../protocol/writer.ts' import { createAPI, type ResponseErrorWithLocation } from '../definitions.ts' +import { + type IncrementalAlterConfigOperationTypes, + type ConfigResource, + type IncrementalAlterConfigOperationType +} from '../enumerations.ts' -export interface IncrementalAlterConfigsRequestConfig { +interface IncrementalAlterConfigsDeletionRequestConfig { name: string - configOperation: number - value?: NullableString + configOperation: typeof IncrementalAlterConfigOperationTypes.DELETE } +interface IncrementalAlterConfigsModificationRequestConfig { + name: string + configOperation: Exclude + value: string +} + +export type IncrementalAlterConfigsRequestConfig = + | IncrementalAlterConfigsModificationRequestConfig + | IncrementalAlterConfigsDeletionRequestConfig + export interface IncrementalAlterConfigsRequestResource { - resourceType: number + resourceType: ConfigResource resourceName: string configs: IncrementalAlterConfigsRequestConfig[] } @@ -21,7 +35,7 @@ export type IncrementalAlterConfigsRequest = Parameters export interface IncrementalAlterConfigsResponseResult { errorCode: number errorMessage: NullableString - resourceType: number + resourceType: ConfigResource resourceName: string } @@ -47,7 +61,9 @@ export function createRequest (resources: IncrementalAlterConfigsRequestResource w.appendInt8(r.resourceType) .appendString(r.resourceName) .appendArray(r.configs, (w, r) => { - w.appendString(r.name).appendInt8(r.configOperation).appendString(r.value) + w.appendString(r.name) + .appendInt8(r.configOperation) + .appendString((r as IncrementalAlterConfigsModificationRequestConfig).value) }) }) .appendBoolean(validateOnly) @@ -83,7 +99,7 @@ export function parseResponse ( return { errorCode, errorMessage: r.readNullableString(), - resourceType: r.readInt8(), + resourceType: r.readInt8() as ConfigResource, resourceName: r.readString() } }) diff --git a/src/apis/enumerations.ts b/src/apis/enumerations.ts index fb31983..71e9c19 100644 --- a/src/apis/enumerations.ts +++ b/src/apis/enumerations.ts @@ -83,20 +83,39 @@ export const ConfigSources = { DYNAMIC_DEFAULT_BROKER_CONFIG: 3, STATIC_BROKER_CONFIG: 4, DEFAULT_CONFIG: 5, - DYNAMIC_BROKER_LOGGER_CONFIG: 6 + DYNAMIC_BROKER_LOGGER_CONFIG: 6, + CLIENT_METRICS_CONFIG: 7, + GROUP_CONFIG: 8 } as const -export type ConfigSource = keyof typeof ConfigSources +export type ConfigSource = (typeof ConfigSources)[keyof typeof ConfigSources] -export const ConfigTypes = { +export const ConfigResources = { UNKNOWN: 0, TOPIC: 2, BROKER: 4, - BROKER_LOGGER: 8 + BROKER_LOGGER: 8, + CLIENT_METRICS: 16, + GROUP: 32 +} as const +export type ConfigResource = (typeof ConfigResources)[keyof typeof ConfigResources] + +export const ConfigTypes = { + UNKNOWN: 0, + BOOLEAN: 1, + STRING: 2, + INT: 3, + SHORT: 4, + LONG: 5, + DOUBLE: 6, + LIST: 7, + CLASS: 8, + PASSWORD: 9 } as const -export type ConfigType = keyof typeof ConfigTypes +export type ConfigType = (typeof ConfigTypes)[keyof typeof ConfigTypes] -export const IncrementalAlterConfigTypes = { SET: 0, DELETE: 1, APPEND: 2, SUBTRACT: 3 } -export type IncrementalAlterConfigType = keyof typeof IncrementalAlterConfigTypes +export const IncrementalAlterConfigOperationTypes = { SET: 0, DELETE: 1, APPEND: 2, SUBTRACT: 3 } as const +export type IncrementalAlterConfigOperationType = + (typeof IncrementalAlterConfigOperationTypes)[keyof typeof IncrementalAlterConfigOperationTypes] // ./admin/*-client-quotas.ts export const ClientQuotaMatchTypes = { EXACT: 0, DEFAULT: 1, ANY: 2 } as const diff --git a/src/clients/admin/admin.ts b/src/clients/admin/admin.ts index 1bddabd..7bc5002 100644 --- a/src/clients/admin/admin.ts +++ b/src/clients/admin/admin.ts @@ -3,12 +3,18 @@ import { type AlterClientQuotasResponse, type AlterClientQuotasResponseEntries } from '../../apis/admin/alter-client-quotas-v1.ts' +import { type AlterConfigsRequest, type AlterConfigsResponse } from '../../apis/admin/alter-configs-v2.ts' import { type CreateTopicsRequest, type CreateTopicsRequestTopic, type CreateTopicsRequestTopicAssignment, type CreateTopicsResponse } from '../../apis/admin/create-topics-v7.ts' +import { type DescribeConfigsRequest, type DescribeConfigsResponse } from '../../apis/admin/describe-configs-v4.ts' +import { + type IncrementalAlterConfigsRequest, + type IncrementalAlterConfigsResponse +} from '../../apis/admin/incremental-alter-configs-v1.ts' import { type DeleteGroupsRequest, type DeleteGroupsResponse } from '../../apis/admin/delete-groups-v2.ts' import { type DeleteTopicsRequest, @@ -34,11 +40,17 @@ import { type CallbackWithPromise } from '../../apis/callbacks.ts' import { type Callback } from '../../apis/definitions.ts' -import { FindCoordinatorKeyTypes, type ConsumerGroupState } from '../../apis/enumerations.ts' +import { + type ConfigResource, + ConfigResources, + FindCoordinatorKeyTypes, + type ConsumerGroupState +} from '../../apis/enumerations.ts' import { type FindCoordinatorRequest, type FindCoordinatorResponse } from '../../apis/metadata/find-coordinator-v6.ts' import { type MetadataRequest, type MetadataResponse } from '../../apis/metadata/metadata-v12.ts' import { adminClientQuotasChannel, + adminConfigsChannel, adminGroupsChannel, adminLogDirsChannel, adminTopicsChannel, @@ -63,32 +75,40 @@ import { type BaseOptions } from '../base/types.ts' import { type GroupAssignment } from '../consumer/types.ts' import { alterClientQuotasOptionsValidator, + alterConfigsOptionsValidator, createTopicsOptionsValidator, deleteGroupsOptionsValidator, deleteTopicsOptionsValidator, describeClientQuotasOptionsValidator, + describeConfigsOptionsValidator, describeGroupsOptionsValidator, describeLogDirsOptionsValidator, + incrementalAlterConfigsOptionsValidator, listGroupsOptionsValidator, listTopicsOptionsValidator } from './options.ts' import { type AdminOptions, type AlterClientQuotasOptions, + type AlterConfigsOptions, type BrokerLogDirDescription, type CreatedTopic, type CreateTopicsOptions, type DeleteGroupsOptions, type DeleteTopicsOptions, type DescribeClientQuotasOptions, + type DescribeConfigsOptions, type DescribeGroupsOptions, type DescribeLogDirsOptions, type Group, type GroupBase, type GroupMember, + type IncrementalAlterConfigsOptions, type ListGroupsOptions, - type ListTopicsOptions + type ListTopicsOptions, + type ConfigDescription } from './types.ts' +import { type Connection } from '../../index.ts' export class Admin extends Base { constructor (options: AdminOptions) { @@ -392,6 +412,99 @@ export class Admin extends Base { return callback[kCallbackPromise] } + describeConfigs (options: DescribeConfigsOptions, callback: CallbackWithPromise): void + describeConfigs (options: DescribeConfigsOptions): Promise + describeConfigs ( + options: DescribeConfigsOptions, + callback?: CallbackWithPromise + ): void | Promise { + if (!callback) { + callback = createPromisifiedCallback() + } + + if (this[kCheckNotClosed](callback)) { + return callback[kCallbackPromise] + } + + const validationError = this[kValidateOptions](options, describeConfigsOptionsValidator, '/options', false) + if (validationError) { + callback(validationError, undefined as unknown as ConfigDescription[]) + return callback[kCallbackPromise] + } + + adminConfigsChannel.traceCallback( + this.#describeConfigs, + 1, + createDiagnosticContext({ client: this, operation: 'describeConfigs', options }), + this, + options, + callback + ) + + return callback[kCallbackPromise] + } + + alterConfigs (options: AlterConfigsOptions, callback: CallbackWithPromise): void + alterConfigs (options: AlterConfigsOptions): Promise + alterConfigs (options: AlterConfigsOptions, callback?: CallbackWithPromise): void | Promise { + if (!callback) { + callback = createPromisifiedCallback() + } + + if (this[kCheckNotClosed](callback)) { + return callback[kCallbackPromise] + } + + const validationError = this[kValidateOptions](options, alterConfigsOptionsValidator, '/options', false) + if (validationError) { + callback(validationError, undefined) + return callback[kCallbackPromise] + } + + adminConfigsChannel.traceCallback( + this.#alterConfigs, + 1, + createDiagnosticContext({ client: this, operation: 'alterConfigs', options }), + this, + options, + callback + ) + + return callback[kCallbackPromise] + } + + incrementalAlterConfigs (options: IncrementalAlterConfigsOptions, callback: CallbackWithPromise): void + incrementalAlterConfigs (options: IncrementalAlterConfigsOptions): Promise + incrementalAlterConfigs ( + options: IncrementalAlterConfigsOptions, + callback?: CallbackWithPromise + ): void | Promise { + if (!callback) { + callback = createPromisifiedCallback() + } + + if (this[kCheckNotClosed](callback)) { + return callback[kCallbackPromise] + } + + const validationError = this[kValidateOptions](options, incrementalAlterConfigsOptionsValidator, '/options', false) + if (validationError) { + callback(validationError, undefined) + return callback[kCallbackPromise] + } + + adminConfigsChannel.traceCallback( + this.#incrementalAlterConfigs, + 1, + createDiagnosticContext({ client: this, operation: 'incrementalAlterConfigs', options }), + this, + options, + callback + ) + + return callback[kCallbackPromise] + } + #listTopics (options: ListTopicsOptions, callback: CallbackWithPromise): void { const includeInternals = options.includeInternals ?? false @@ -987,4 +1100,222 @@ export class Admin extends Base { ) }) } + + #getConfigRequestsDistributedToBrokers ( + resources: T[] + ): Map { + const brokerConfigResourceMap = new Map() + for (const resource of resources) { + if (resource.resourceType === ConfigResources.BROKER || resource.resourceType === ConfigResources.BROKER_LOGGER) { + if (!brokerConfigResourceMap.has(Number(resource.resourceName))) { + brokerConfigResourceMap.set(Number(resource.resourceName), []) + } + brokerConfigResourceMap.get(Number(resource.resourceName))!.push(resource) + } else { + if (!brokerConfigResourceMap.has(-1)) { + brokerConfigResourceMap.set(-1, []) + } + brokerConfigResourceMap.get(-1)!.push(resource) + } + } + return brokerConfigResourceMap + } + + #getAnyOrSpecificBrokerConnection (brokerId: number, callback: CallbackWithPromise): void { + if (brokerId === -1) { + this[kGetBootstrapConnection](callback) + return + } + + this[kMetadata]({ topics: [] }, (error, metadata) => { + if (error) { + callback(error, undefined as unknown as Connection) + return + } + + const brokerInstance = metadata.brokers.get(brokerId) + if (!brokerInstance) { + callback(new Error(`Broker with id ${brokerId} not found in cluster.`), undefined as unknown as Connection) + return + } + + this[kGetConnection](brokerInstance, callback) + }) + } + + #describeConfigs (options: DescribeConfigsOptions, callback: CallbackWithPromise): void { + runConcurrentCallbacks( + 'Describing configs failed.', + this.#getConfigRequestsDistributedToBrokers(options.resources), + ([brokerId, resources], concurrentCallback) => { + this.#describeConfigsOnBroker({ ...options, resources }, brokerId, (error, response) => { + if (error) { + concurrentCallback(error, undefined as unknown as ConfigDescription[]) + return + } + + concurrentCallback(null, response) + }) + }, + (error, results) => { + callback(error, results?.flat()) + } + ) + } + + #describeConfigsOnBroker ( + options: DescribeConfigsOptions, + broker: number | Connection, + callback: CallbackWithPromise + ): void { + if (typeof broker === 'number') { + this.#getAnyOrSpecificBrokerConnection(broker, (error, connection) => { + if (error) { + callback(error, undefined as unknown as ConfigDescription[]) + return + } + this.#describeConfigsOnBroker(options, connection, callback) + }) + return + } + + this[kPerformWithRetry]( + 'describeConfigs', + retryCallback => { + this[kGetApi]('DescribeConfigs', (error, api) => { + if (error) { + retryCallback(error, undefined as unknown as DescribeConfigsResponse) + return + } + + api( + broker, + options.resources, + options.includeSynonyms ?? false, + options.includeDocumentation ?? false, + retryCallback as unknown as Callback + ) + }) + }, + (error: Error | null, response: DescribeConfigsResponse) => { + if (error) { + callback(error, undefined as unknown as ConfigDescription[]) + return + } + + const resultsWithoutErrors = response.results.map(result => ({ + resourceType: result.resourceType, + resourceName: result.resourceName, + configs: result.configs + })) + callback(null, resultsWithoutErrors) + }, + 0 + ) + } + + #alterConfigs (options: DescribeConfigsOptions, callback: CallbackWithPromise): void { + runConcurrentCallbacks( + 'Altering configs failed.', + this.#getConfigRequestsDistributedToBrokers(options.resources), + ([brokerId, resources], concurrentCallback) => { + this.#alterConfigsOnBroker({ ...options, resources }, brokerId, concurrentCallback) + }, + error => { + callback(error) + } + ) + } + + #alterConfigsOnBroker ( + options: AlterConfigsOptions, + broker: number | Connection, + callback: CallbackWithPromise + ): void { + if (typeof broker === 'number') { + this.#getAnyOrSpecificBrokerConnection(broker, (error, connection) => { + if (error) { + callback(error) + return + } + this.#alterConfigsOnBroker(options, connection, callback) + }) + return + } + + this[kPerformWithRetry]( + 'alterConfigs', + retryCallback => { + this[kGetApi]('AlterConfigs', (error, api) => { + if (error) { + retryCallback(error) + return + } + + api( + broker, + options.resources, + options.validateOnly ?? false, + retryCallback as unknown as Callback + ) + }) + }, + callback, + 0 + ) + } + + #incrementalAlterConfigs (options: IncrementalAlterConfigsOptions, callback: CallbackWithPromise): void { + runConcurrentCallbacks( + 'Incrementally altering configs failed.', + this.#getConfigRequestsDistributedToBrokers(options.resources), + ([brokerId, resources], concurrentCallback) => { + this.#incrementalAlterConfigsOnBroker({ ...options, resources }, brokerId, concurrentCallback) + }, + error => { + callback(error) + } + ) + } + + #incrementalAlterConfigsOnBroker ( + options: IncrementalAlterConfigsOptions, + broker: number | Connection, + callback: CallbackWithPromise + ): void { + if (typeof broker === 'number') { + this.#getAnyOrSpecificBrokerConnection(broker, (error, connection) => { + if (error) { + callback(error) + return + } + this.#incrementalAlterConfigsOnBroker(options, connection, callback) + }) + return + } + + this[kPerformWithRetry]( + 'incrementalAlterConfigs', + retryCallback => { + this[kGetApi]('IncrementalAlterConfigs', ( + error, + api + ) => { + if (error) { + retryCallback(error) + return + } + + api( + broker, + options.resources, + options.validateOnly ?? false, + retryCallback as unknown as Callback + ) + }) + }, + callback, + 0 + ) + } } diff --git a/src/clients/admin/options.ts b/src/clients/admin/options.ts index 38758a8..bee9668 100644 --- a/src/clients/admin/options.ts +++ b/src/clients/admin/options.ts @@ -1,4 +1,9 @@ -import { ClientQuotaMatchTypes, ConsumerGroupStates } from '../../apis/enumerations.ts' +import { + ClientQuotaMatchTypes, + ConsumerGroupStates, + IncrementalAlterConfigOperationTypes, + ResourceTypes +} from '../../apis/enumerations.ts' import { ajv, listErrorMessage } from '../../utils.ts' import { idProperty } from '../base/options.ts' @@ -199,6 +204,124 @@ export const describeLogDirsOptionsSchema = { additionalProperties: false } +export const describeConfigsOptionsSchema = { + type: 'object', + properties: { + resources: { + type: 'array', + items: { + type: 'object', + properties: { + resourceType: { type: 'number', enum: Object.values(ResourceTypes) }, + resourceName: { type: 'string', minLength: 1 }, + configurationKeys: { + type: ['array', 'null'], + items: { type: 'string', minLength: 1 } + } + }, + required: ['resourceType', 'resourceName'], + additionalProperties: false + }, + minItems: 1 + }, + includeSynonyms: { type: 'boolean' }, + includeDocumentation: { type: 'boolean' } + }, + required: ['resources'], + additionalProperties: false +} + +export const alterConfigsOptionsSchema = { + type: 'object', + properties: { + resources: { + type: 'array', + items: { + type: 'object', + properties: { + resourceType: { type: 'number', enum: Object.values(ResourceTypes) }, + resourceName: { type: 'string', minLength: 1 }, + configs: { + type: 'array', + items: { + type: 'object', + properties: { + name: { type: 'string', minLength: 1 }, + value: { type: ['string', 'null'] } + }, + required: ['name'], + additionalProperties: false + }, + minItems: 1 + } + }, + required: ['resourceType', 'resourceName', 'configs'], + additionalProperties: false + }, + minItems: 1 + }, + validateOnly: { type: 'boolean' } + }, + required: ['resources'], + additionalProperties: false +} + +export const incrementalAlterConfigsOptionsSchema = { + type: 'object', + properties: { + resources: { + type: 'array', + items: { + type: 'object', + properties: { + resourceType: { type: 'number', enum: Object.values(ResourceTypes) }, + resourceName: { type: 'string', minLength: 1 }, + configs: { + type: 'array', + items: { + oneOf: [ + { + type: 'object', + properties: { + name: { type: 'string', minLength: 1 }, + configOperation: { + type: 'number', + enum: [ + IncrementalAlterConfigOperationTypes.SET, + IncrementalAlterConfigOperationTypes.APPEND, + IncrementalAlterConfigOperationTypes.SUBTRACT + ] + }, + value: { type: 'string' } + }, + required: ['name', 'configOperation', 'value'], + additionalProperties: false + }, + { + type: 'object', + properties: { + name: { type: 'string', minLength: 1 }, + configOperation: { type: 'number', enum: [IncrementalAlterConfigOperationTypes.DELETE] } + }, + required: ['name', 'configOperation'], + additionalProperties: false + } + ] + }, + minItems: 1 + } + }, + required: ['resourceType', 'resourceName', 'configs'], + additionalProperties: false + }, + minItems: 1 + }, + validateOnly: { type: 'boolean' } + }, + required: ['resources'], + additionalProperties: false +} + export const createTopicsOptionsValidator = ajv.compile(createTopicOptionsSchema) export const listTopicsOptionsValidator = ajv.compile(listTopicOptionsSchema) export const deleteTopicsOptionsValidator = ajv.compile(deleteTopicOptionsSchema) @@ -208,3 +331,6 @@ export const deleteGroupsOptionsValidator = ajv.compile(deleteGroupsOptionsSchem export const describeClientQuotasOptionsValidator = ajv.compile(describeClientQuotasOptionsSchema) export const alterClientQuotasOptionsValidator = ajv.compile(alterClientQuotasOptionsSchema) export const describeLogDirsOptionsValidator = ajv.compile(describeLogDirsOptionsSchema) +export const describeConfigsOptionsValidator = ajv.compile(describeConfigsOptionsSchema) +export const alterConfigsOptionsValidator = ajv.compile(alterConfigsOptionsSchema) +export const incrementalAlterConfigsOptionsValidator = ajv.compile(incrementalAlterConfigsOptionsSchema) diff --git a/src/clients/admin/types.ts b/src/clients/admin/types.ts index ac86e19..937f28d 100644 --- a/src/clients/admin/types.ts +++ b/src/clients/admin/types.ts @@ -1,12 +1,18 @@ import { type AlterClientQuotasRequestEntry } from '../../apis/admin/alter-client-quotas-v1.ts' import { type CreateTopicsRequestTopicConfig } from '../../apis/admin/create-topics-v7.ts' +import { type AlterConfigsRequestResource } from '../../apis/admin/alter-configs-v2.ts' +import { + type DescribeConfigsRequestResource, + type DescribeConfigsResponseConfig +} from '../../apis/admin/describe-configs-v4.ts' import { type DescribeClientQuotasRequestComponent } from '../../apis/admin/describe-client-quotas-v0.ts' import { type DescribeLogDirsRequestTopic, type DescribeLogDirsResponse, type DescribeLogDirsResponseResult } from '../../apis/admin/describe-log-dirs-v4.ts' -import { type ConsumerGroupState } from '../../apis/enumerations.ts' +import { type IncrementalAlterConfigsRequestResource } from '../../apis/admin/incremental-alter-configs-v1.ts' +import { type ConfigResource, type ConsumerGroupState } from '../../apis/enumerations.ts' import { type NullableString } from '../../protocol/definitions.ts' import { type BaseOptions } from '../base/types.ts' import { type ExtendedGroupProtocolSubscription, type GroupAssignment } from '../consumer/types.ts' @@ -98,3 +104,25 @@ export interface BrokerLogDirDescription { throttleTimeMs: DescribeLogDirsResponse['throttleTimeMs'] results: Omit[] } + +export interface DescribeConfigsOptions { + resources: DescribeConfigsRequestResource[] + includeSynonyms?: boolean + includeDocumentation?: boolean +} + +export interface ConfigDescription { + resourceType: ConfigResource + resourceName: string + configs: DescribeConfigsResponseConfig[] +} + +export interface AlterConfigsOptions { + resources: AlterConfigsRequestResource[] + validateOnly?: boolean +} + +export interface IncrementalAlterConfigsOptions { + resources: IncrementalAlterConfigsRequestResource[] + validateOnly?: boolean +} diff --git a/src/diagnostic.ts b/src/diagnostic.ts index 11f2b0d..8d30330 100644 --- a/src/diagnostic.ts +++ b/src/diagnostic.ts @@ -77,6 +77,7 @@ export const adminTopicsChannel = createTracingChannel('a export const adminGroupsChannel = createTracingChannel('admin:groups') export const adminClientQuotasChannel = createTracingChannel('admin:clientQuotas') export const adminLogDirsChannel = createTracingChannel('admin:logDirs') +export const adminConfigsChannel = createTracingChannel('admin:configs') // Producer channels export const producerInitIdempotentChannel = createTracingChannel('producer:initIdempotent') diff --git a/test/apis/admin/alter-configs-v2.test.ts b/test/apis/admin/alter-configs-v2.test.ts index 9838385..3182d66 100644 --- a/test/apis/admin/alter-configs-v2.test.ts +++ b/test/apis/admin/alter-configs-v2.test.ts @@ -5,21 +5,23 @@ import { alterConfigsV2, Reader, ResponseError, Writer } from '../../../src/inde const { createRequest, parseResponse } = alterConfigsV2 test('createRequest serializes basic parameters correctly', () => { - const resources = [ - { - resourceType: 2, // Topic - resourceName: 'test-topic', - configs: [ - { - name: 'cleanup.policy', - value: 'compact' - } - ] - } - ] const validateOnly = false - const writer = createRequest(resources, validateOnly) + const writer = createRequest( + [ + { + resourceType: 2, // Topic + resourceName: 'test-topic', + configs: [ + { + name: 'cleanup.policy', + value: 'compact' + } + ] + } + ], + validateOnly + ) // Verify it returns a Writer ok(writer instanceof Writer, 'Should return a Writer instance') @@ -71,31 +73,33 @@ test('createRequest serializes basic parameters correctly', () => { }) test('createRequest serializes multiple resources correctly', () => { - const resources = [ - { - resourceType: 2, // Topic - resourceName: 'test-topic-1', - configs: [ - { - name: 'cleanup.policy', - value: 'compact' - } - ] - }, - { - resourceType: 2, // Topic - resourceName: 'test-topic-2', - configs: [ - { - name: 'retention.ms', - value: '86400000' - } - ] - } - ] const validateOnly = false - const writer = createRequest(resources, validateOnly) + const writer = createRequest( + [ + { + resourceType: 2, // Topic + resourceName: 'test-topic-1', + configs: [ + { + name: 'cleanup.policy', + value: 'compact' + } + ] + }, + { + resourceType: 2, // Topic + resourceName: 'test-topic-2', + configs: [ + { + name: 'retention.ms', + value: '86400000' + } + ] + } + ], + validateOnly + ) const reader = Reader.from(writer) // Read resources array @@ -146,29 +150,31 @@ test('createRequest serializes multiple resources correctly', () => { }) test('createRequest serializes multiple configs correctly', () => { - const resources = [ - { - resourceType: 2, // Topic - resourceName: 'test-topic', - configs: [ - { - name: 'cleanup.policy', - value: 'compact' - }, - { - name: 'retention.ms', - value: '86400000' - }, - { - name: 'min.insync.replicas', - value: '2' - } - ] - } - ] const validateOnly = false - const writer = createRequest(resources, validateOnly) + const writer = createRequest( + [ + { + resourceType: 2, // Topic + resourceName: 'test-topic', + configs: [ + { + name: 'cleanup.policy', + value: 'compact' + }, + { + name: 'retention.ms', + value: '86400000' + }, + { + name: 'min.insync.replicas', + value: '2' + } + ] + } + ], + validateOnly + ) const reader = Reader.from(writer) // Read resources array @@ -208,21 +214,23 @@ test('createRequest serializes multiple configs correctly', () => { }) test('createRequest serializes null config values correctly', () => { - const resources = [ - { - resourceType: 2, // Topic - resourceName: 'test-topic', - configs: [ - { - name: 'cleanup.policy', - value: null - } - ] - } - ] const validateOnly = false - const writer = createRequest(resources, validateOnly) + const writer = createRequest( + [ + { + resourceType: 2, // Topic + resourceName: 'test-topic', + configs: [ + { + name: 'cleanup.policy', + value: null + } + ] + } + ], + validateOnly + ) const reader = Reader.from(writer) // Read resources array @@ -245,21 +253,23 @@ test('createRequest serializes null config values correctly', () => { }) test('createRequest serializes validateOnly flag correctly', () => { - const resources = [ - { - resourceType: 2, // Topic - resourceName: 'test-topic', - configs: [ - { - name: 'cleanup.policy', - value: 'compact' - } - ] - } - ] const validateOnly = true - const writer = createRequest(resources, validateOnly) + const writer = createRequest( + [ + { + resourceType: 2, // Topic + resourceName: 'test-topic', + configs: [ + { + name: 'cleanup.policy', + value: 'compact' + } + ] + } + ], + validateOnly + ) const reader = Reader.from(writer) // Skip resources array diff --git a/test/apis/admin/describe-configs-v2.test.ts b/test/apis/admin/describe-configs-v2.test.ts index daecbaf..28da7cd 100644 --- a/test/apis/admin/describe-configs-v2.test.ts +++ b/test/apis/admin/describe-configs-v2.test.ts @@ -25,12 +25,16 @@ test('createRequest serializes basic parameters correctly', () => { const reader = Reader.from(writer) // Read resources array - const resourcesArray = reader.readArray(() => { - const resourceType = reader.readInt8() - const resourceName = reader.readString(false) - const configurationKeys = reader.readArray(() => reader.readString(false), false, false) - return { resourceType, resourceName, configurationKeys } - }, false, false) + const resourcesArray = reader.readArray( + () => { + const resourceType = reader.readInt8() + const resourceName = reader.readString(false) + const configurationKeys = reader.readArray(() => reader.readString(false), false, false) + return { resourceType, resourceName, configurationKeys } + }, + false, + false + ) // Read includeSynonyms boolean const includeSynonymsValue = reader.readBoolean() @@ -75,12 +79,16 @@ test('createRequest serializes configurationKeys correctly', () => { const reader = Reader.from(writer) // Read resources array - const resourcesArray = reader.readArray(() => { - const resourceType = reader.readInt8() - const resourceName = reader.readString(false) - const configurationKeys = reader.readArray(() => reader.readString(false), false, false) - return { resourceType, resourceName, configurationKeys } - }, false, false) + const resourcesArray = reader.readArray( + () => { + const resourceType = reader.readInt8() + const resourceName = reader.readString(false) + const configurationKeys = reader.readArray(() => reader.readString(false), false, false) + return { resourceType, resourceName, configurationKeys } + }, + false, + false + ) // Verify configuration keys deepStrictEqual( @@ -110,12 +118,16 @@ test('createRequest serializes multiple resources correctly', () => { const reader = Reader.from(writer) // Read resources array - const resourcesArray = reader.readArray(() => { - const resourceType = reader.readInt8() - const resourceName = reader.readString(false) - const configurationKeys = reader.readArray(() => reader.readString(false), false, false) - return { resourceType, resourceName, configurationKeys } - }, false, false) + const resourcesArray = reader.readArray( + () => { + const resourceType = reader.readInt8() + const resourceName = reader.readString(false) + const configurationKeys = reader.readArray(() => reader.readString(false), false, false) + return { resourceType, resourceName, configurationKeys } + }, + false, + false + ) // Verify multiple resources deepStrictEqual( @@ -151,12 +163,16 @@ test('createRequest serializes include flags correctly', () => { const reader = Reader.from(writer) // Skip resources array - reader.readArray(() => { - reader.readInt8() - reader.readString(false) - reader.readArray(() => reader.readString(false), false, false) - return {} - }, false, false) + reader.readArray( + () => { + reader.readInt8() + reader.readString(false) + reader.readArray(() => reader.readString(false), false, false) + return {} + }, + false, + false + ) // Read include flags const includeSynonymsValue = reader.readBoolean() diff --git a/test/apis/admin/describe-configs-v3.test.ts b/test/apis/admin/describe-configs-v3.test.ts index a751b5d..0a8c114 100644 --- a/test/apis/admin/describe-configs-v3.test.ts +++ b/test/apis/admin/describe-configs-v3.test.ts @@ -25,12 +25,16 @@ test('createRequest serializes basic parameters correctly', () => { const reader = Reader.from(writer) // Read resources array - const resourcesArray = reader.readArray(() => { - const resourceType = reader.readInt8() - const resourceName = reader.readString(false) - const configurationKeys = reader.readArray(() => reader.readString(false), false, false) - return { resourceType, resourceName, configurationKeys } - }, false, false) + const resourcesArray = reader.readArray( + () => { + const resourceType = reader.readInt8() + const resourceName = reader.readString(false) + const configurationKeys = reader.readArray(() => reader.readString(false), false, false) + return { resourceType, resourceName, configurationKeys } + }, + false, + false + ) // Read includeSynonyms boolean const includeSynonymsValue = reader.readBoolean() @@ -75,12 +79,16 @@ test('createRequest serializes configurationKeys correctly', () => { const reader = Reader.from(writer) // Read resources array - const resourcesArray = reader.readArray(() => { - const resourceType = reader.readInt8() - const resourceName = reader.readString(false) - const configurationKeys = reader.readArray(() => reader.readString(false), false, false) - return { resourceType, resourceName, configurationKeys } - }, false, false) + const resourcesArray = reader.readArray( + () => { + const resourceType = reader.readInt8() + const resourceName = reader.readString(false) + const configurationKeys = reader.readArray(() => reader.readString(false), false, false) + return { resourceType, resourceName, configurationKeys } + }, + false, + false + ) // Verify configuration keys deepStrictEqual( @@ -110,12 +118,16 @@ test('createRequest serializes multiple resources correctly', () => { const reader = Reader.from(writer) // Read resources array - const resourcesArray = reader.readArray(() => { - const resourceType = reader.readInt8() - const resourceName = reader.readString(false) - const configurationKeys = reader.readArray(() => reader.readString(false), false, false) - return { resourceType, resourceName, configurationKeys } - }, false, false) + const resourcesArray = reader.readArray( + () => { + const resourceType = reader.readInt8() + const resourceName = reader.readString(false) + const configurationKeys = reader.readArray(() => reader.readString(false), false, false) + return { resourceType, resourceName, configurationKeys } + }, + false, + false + ) // Verify multiple resources deepStrictEqual( @@ -151,12 +163,16 @@ test('createRequest serializes include flags correctly', () => { const reader = Reader.from(writer) // Skip resources array - reader.readArray(() => { - reader.readInt8() - reader.readString(false) - reader.readArray(() => reader.readString(false), false, false) - return {} - }, false, false) + reader.readArray( + () => { + reader.readInt8() + reader.readString(false) + reader.readArray(() => reader.readString(false), false, false) + return {} + }, + false, + false + ) // Read include flags const includeSynonymsValue = reader.readBoolean() diff --git a/test/apis/admin/describe-configs-v4.test.ts b/test/apis/admin/describe-configs-v4.test.ts index 12cb4c9..0424b29 100644 --- a/test/apis/admin/describe-configs-v4.test.ts +++ b/test/apis/admin/describe-configs-v4.test.ts @@ -6,17 +6,20 @@ import { Reader, ResponseError, Writer, describeConfigsV4 } from '../../../src/i const { createRequest, parseResponse } = describeConfigsV4 test('createRequest serializes basic parameters correctly', () => { - const resources = [ - { - resourceType: 2, // TOPIC - resourceName: 'test-topic', - configurationKeys: [] - } - ] const includeSynonyms = true const includeDocumentation = false - const writer = createRequest(resources, includeSynonyms, includeDocumentation) + const writer = createRequest( + [ + { + resourceType: 2, // TOPIC + resourceName: 'test-topic', + configurationKeys: [] + } + ], + includeSynonyms, + includeDocumentation + ) // Verify it returns a Writer ok(writer instanceof Writer, 'Should return a Writer instance') @@ -61,17 +64,20 @@ test('createRequest serializes basic parameters correctly', () => { }) test('createRequest serializes configurationKeys correctly', () => { - const resources = [ - { - resourceType: 2, // TOPIC - resourceName: 'test-topic', - configurationKeys: ['cleanup.policy', 'compression.type'] - } - ] const includeSynonyms = false const includeDocumentation = false - const writer = createRequest(resources, includeSynonyms, includeDocumentation) + const writer = createRequest( + [ + { + resourceType: 2, // TOPIC + resourceName: 'test-topic', + configurationKeys: ['cleanup.policy', 'compression.type'] + } + ], + includeSynonyms, + includeDocumentation + ) const reader = Reader.from(writer) // Read resources array @@ -91,22 +97,25 @@ test('createRequest serializes configurationKeys correctly', () => { }) test('createRequest serializes multiple resources correctly', () => { - const resources = [ - { - resourceType: 2, // TOPIC - resourceName: 'topic-1', - configurationKeys: ['cleanup.policy'] - }, - { - resourceType: 4, // BROKER - resourceName: '1', - configurationKeys: ['num.io.threads'] - } - ] const includeSynonyms = false const includeDocumentation = false - const writer = createRequest(resources, includeSynonyms, includeDocumentation) + const writer = createRequest( + [ + { + resourceType: 2, // TOPIC + resourceName: 'topic-1', + configurationKeys: ['cleanup.policy'] + }, + { + resourceType: 4, // BROKER + resourceName: '1', + configurationKeys: ['num.io.threads'] + } + ], + includeSynonyms, + includeDocumentation + ) const reader = Reader.from(writer) // Read resources array @@ -137,17 +146,20 @@ test('createRequest serializes multiple resources correctly', () => { }) test('createRequest serializes include flags correctly', () => { - const resources = [ - { - resourceType: 2, - resourceName: 'test-topic', - configurationKeys: [] - } - ] const includeSynonyms = true const includeDocumentation = true - const writer = createRequest(resources, includeSynonyms, includeDocumentation) + const writer = createRequest( + [ + { + resourceType: 2, + resourceName: 'test-topic', + configurationKeys: [] + } + ], + includeSynonyms, + includeDocumentation + ) const reader = Reader.from(writer) // Skip resources array @@ -229,14 +241,9 @@ test('parseResponse correctly processes a successful response with configs', () .appendBoolean(config.readOnly) .appendInt8(config.configSource) .appendBoolean(config.isSensitive) - .appendArray( - config.synonyms, - (w, synonym) => { - w.appendString(synonym.name).appendString(synonym.value).appendInt8(synonym.source) - }, - true, - false - ) + .appendArray(config.synonyms, (w, synonym) => { + w.appendString(synonym.name).appendString(synonym.value).appendInt8(synonym.source) + }) .appendInt8(config.configType) .appendString(config.documentation) }) @@ -318,7 +325,7 @@ test('parseResponse correctly processes multiple config entries', () => { .appendBoolean(config.readOnly) .appendInt8(config.configSource) .appendBoolean(config.isSensitive) - .appendArray(config.synonyms, () => {}, true, false) + .appendArray(config.synonyms, () => {}) .appendInt8(config.configType) .appendString(config.documentation) }) @@ -428,7 +435,7 @@ test('parseResponse handles multiple resources with mixed errors', () => { .appendBoolean(config.readOnly) .appendInt8(config.configSource) .appendBoolean(config.isSensitive) - .appendArray([], () => {}, true, false) // Empty synonyms + .appendArray([], () => {}) // Empty synonyms .appendInt8(config.configType) .appendString(config.documentation) } diff --git a/test/apis/admin/incremental-alter-configs-v1.test.ts b/test/apis/admin/incremental-alter-configs-v1.test.ts index 35b9ab1..bffe916 100644 --- a/test/apis/admin/incremental-alter-configs-v1.test.ts +++ b/test/apis/admin/incremental-alter-configs-v1.test.ts @@ -5,22 +5,24 @@ import { Reader, ResponseError, Writer, incrementalAlterConfigsV1 } from '../../ const { createRequest, parseResponse } = incrementalAlterConfigsV1 test('createRequest serializes basic parameters correctly', () => { - const resources = [ - { - resourceType: 2, // TOPIC - resourceName: 'test-topic', - configs: [ - { - name: 'cleanup.policy', - configOperation: 0, // SET - value: 'compact' - } - ] - } - ] const validateOnly = false - const writer = createRequest(resources, validateOnly) + const writer = createRequest( + [ + { + resourceType: 2, // TOPIC + resourceName: 'test-topic', + configs: [ + { + name: 'cleanup.policy', + configOperation: 0, // SET + value: 'compact' + } + ] + } + ], + validateOnly + ) // Verify it returns a Writer ok(writer instanceof Writer, 'Should return a Writer instance') @@ -71,33 +73,35 @@ test('createRequest serializes basic parameters correctly', () => { }) test('createRequest serializes multiple resources correctly', () => { - const resources = [ - { - resourceType: 2, // TOPIC - resourceName: 'topic-1', - configs: [ - { - name: 'cleanup.policy', - configOperation: 0, // SET - value: 'compact' - } - ] - }, - { - resourceType: 4, // BROKER - resourceName: '1', - configs: [ - { - name: 'log.retention.hours', - configOperation: 0, // SET - value: '168' - } - ] - } - ] const validateOnly = false - const writer = createRequest(resources, validateOnly) + const writer = createRequest( + [ + { + resourceType: 2, // TOPIC + resourceName: 'topic-1', + configs: [ + { + name: 'cleanup.policy', + configOperation: 0, // SET + value: 'compact' + } + ] + }, + { + resourceType: 4, // BROKER + resourceName: '1', + configs: [ + { + name: 'log.retention.hours', + configOperation: 0, // SET + value: '168' + } + ] + } + ], + validateOnly + ) const reader = Reader.from(writer) // Read resources array @@ -122,32 +126,34 @@ test('createRequest serializes multiple resources correctly', () => { }) test('createRequest serializes multiple configs per resource correctly', () => { - const resources = [ - { - resourceType: 2, // TOPIC - resourceName: 'test-topic', - configs: [ - { - name: 'cleanup.policy', - configOperation: 0, // SET - value: 'compact' - }, - { - name: 'compression.type', - configOperation: 0, // SET - value: 'lz4' - }, - { - name: 'retention.ms', - configOperation: 0, // SET - value: '604800000' - } - ] - } - ] const validateOnly = false - const writer = createRequest(resources, validateOnly) + const writer = createRequest( + [ + { + resourceType: 2, // TOPIC + resourceName: 'test-topic', + configs: [ + { + name: 'cleanup.policy', + configOperation: 0, // SET + value: 'compact' + }, + { + name: 'compression.type', + configOperation: 0, // SET + value: 'lz4' + }, + { + name: 'retention.ms', + configOperation: 0, // SET + value: '604800000' + } + ] + } + ], + validateOnly + ) const reader = Reader.from(writer) // Read resources array @@ -175,37 +181,38 @@ test('createRequest serializes multiple configs per resource correctly', () => { }) test('createRequest serializes different config operations correctly', () => { - const resources = [ - { - resourceType: 2, // TOPIC - resourceName: 'test-topic', - configs: [ - { - name: 'cleanup.policy', - configOperation: 0, // SET - value: 'compact' - }, - { - name: 'min.insync.replicas', - configOperation: 1, // DELETE - value: null - }, - { - name: 'retention.ms', - configOperation: 2, // APPEND - value: '604800000' - }, - { - name: 'min.cleanable.dirty.ratio', - configOperation: 3, // SUBTRACT - value: '0.5' - } - ] - } - ] const validateOnly = false - const writer = createRequest(resources, validateOnly) + const writer = createRequest( + [ + { + resourceType: 2, // TOPIC + resourceName: 'test-topic', + configs: [ + { + name: 'cleanup.policy', + configOperation: 0, // SET + value: 'compact' + }, + { + name: 'min.insync.replicas', + configOperation: 1 // DELETE + }, + { + name: 'retention.ms', + configOperation: 2, // APPEND + value: '604800000' + }, + { + name: 'min.cleanable.dirty.ratio', + configOperation: 3, // SUBTRACT + value: '0.5' + } + ] + } + ], + validateOnly + ) const reader = Reader.from(writer) // Read resources array @@ -233,22 +240,24 @@ test('createRequest serializes different config operations correctly', () => { }) test('createRequest serializes validateOnly flag correctly', () => { - const resources = [ - { - resourceType: 2, // TOPIC - resourceName: 'test-topic', - configs: [ - { - name: 'cleanup.policy', - configOperation: 0, // SET - value: 'compact' - } - ] - } - ] const validateOnly = true - const writer = createRequest(resources, validateOnly) + const writer = createRequest( + [ + { + resourceType: 2, // TOPIC + resourceName: 'test-topic', + configs: [ + { + name: 'cleanup.policy', + configOperation: 0, // SET + value: 'compact' + } + ] + } + ], + validateOnly + ) const reader = Reader.from(writer) // Skip resources array diff --git a/test/clients/admin/admin.test.ts b/test/clients/admin/admin.test.ts index 6528779..0ff4f4b 100644 --- a/test/clients/admin/admin.test.ts +++ b/test/clients/admin/admin.test.ts @@ -2,27 +2,39 @@ import { deepStrictEqual, ok, strictEqual } from 'node:assert' import { randomUUID } from 'node:crypto' import { test } from 'node:test' import { scheduler } from 'node:timers/promises' -import { ClientQuotaEntityTypes, ClientQuotaKeys } from '../../../src/apis/enumerations.ts' +import { + ClientQuotaEntityTypes, + ClientQuotaKeys, + ConfigResources, + ConfigSources, + ConfigTypes, + IncrementalAlterConfigOperationTypes +} from '../../../src/apis/enumerations.ts' import { kConnections } from '../../../src/clients/base/base.ts' import { Admin, adminClientQuotasChannel, + adminConfigsChannel, adminGroupsChannel, adminLogDirsChannel, adminTopicsChannel, alterClientQuotasV1, + alterConfigsV2, type BrokerLogDirDescription, type ClientDiagnosticEvent, ClientQuotaMatchTypes, type ClusterPartitionMetadata, + type ConfigDescription, Consumer, type CreatedTopic, type DescribeClientQuotasOptions, describeClientQuotasV0, + describeConfigsV4, describeGroupsV5, describeLogDirsV4, EMPTY_BUFFER, type GroupBase, + incrementalAlterConfigsV1, instancesChannel, listGroupsV5, MultipleErrors, @@ -175,6 +187,36 @@ test('all operations should fail when admin is closed', async t => { } catch (error) { strictEqual(error.message, 'Client is closed.') } + + // Attempt to call describeConfigs on closed admin + try { + await admin.describeConfigs({ + resources: [{ resourceType: ConfigResources.TOPIC, resourceName: 'test-topic' }] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message, 'Client is closed.') + } + + // Attempt to call alterConfigs on closed admin + try { + await admin.alterConfigs({ + resources: [{ resourceType: ConfigResources.TOPIC, resourceName: 'test-topic', configs: [] }] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message, 'Client is closed.') + } + + // Attempt to call incrementalAlterConfigs on closed admin + try { + await admin.incrementalAlterConfigs({ + resources: [{ resourceType: ConfigResources.TOPIC, resourceName: 'test-topic', configs: [] }] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message, 'Client is closed.') + } }) test('listTopics should list topics and support diagnostic channels', async t => { @@ -2409,3 +2451,2028 @@ test('describeLogDirs should handle unavailable API errors', async t => { strictEqual(error.errors[0].message.includes('Unsupported API DescribeLogDirs.'), true) } }) + +test('describeConfigs should list configs (TOPIC resource type)', async t => { + const admin = createAdmin(t) + const topicName = await createTopic(t, true) + const initialResult = await admin.describeConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: topicName, + configurationKeys: [] + } + ], + includeSynonyms: false, + includeDocumentation: false + }) + strictEqual(Array.isArray(initialResult), true) + strictEqual(initialResult.length, 1) + strictEqual(initialResult[0].resourceType, ConfigResources.TOPIC) + strictEqual(initialResult[0].resourceName, topicName) + strictEqual(Array.isArray(initialResult[0].configs), true) + strictEqual(initialResult[0].configs.length > 0, true) + for (const config of initialResult[0].configs) { + strictEqual(typeof config.name, 'string') + strictEqual(typeof config.value, 'string') + strictEqual(typeof config.readOnly, 'boolean') + strictEqual(typeof config.configSource, 'number') + strictEqual(typeof config.isSensitive, 'boolean') + strictEqual(Array.isArray(config.synonyms), true) + strictEqual(typeof config.configType, 'number') + strictEqual(config.documentation, null) + } + + // Clean up + await admin.deleteTopics({ topics: [topicName] }) +}) + +test('describeConfigs should list only desired configs (TOPIC resource type)', async t => { + const admin = createAdmin(t) + const topicName = await createTopic(t, true) + const initialResult = await admin.describeConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: topicName, + configurationKeys: ['cleanup.policy', 'retention.ms'] + } + ], + includeSynonyms: false, + includeDocumentation: false + }) + deepStrictEqual(initialResult, [ + { + resourceType: ConfigResources.TOPIC, + resourceName: topicName, + configs: [ + { + name: 'cleanup.policy', + value: 'delete', + readOnly: false, + configSource: ConfigSources.DEFAULT_CONFIG, + isSensitive: false, + synonyms: [], + configType: ConfigTypes.LIST, + documentation: null + }, + { + name: 'retention.ms', + value: '604800000', + readOnly: false, + configSource: ConfigSources.DEFAULT_CONFIG, + isSensitive: false, + synonyms: [], + configType: ConfigTypes.LONG, + documentation: null + } + ] + } + ]) + + // Clean up + await admin.deleteTopics({ topics: [topicName] }) +}) + +test('describeConfigs should include documentation and synonyms if requested (TOPIC resource type)', async t => { + const admin = createAdmin(t) + const topicName = await createTopic(t, true) + const initialResult = await admin.describeConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: topicName, + configurationKeys: ['cleanup.policy'] + } + ], + includeSynonyms: true, + includeDocumentation: true + }) + deepStrictEqual(initialResult, [ + { + resourceType: ConfigResources.TOPIC, + resourceName: topicName, + configs: [ + { + name: 'cleanup.policy', + value: 'delete', + readOnly: false, + configSource: ConfigSources.DEFAULT_CONFIG, + isSensitive: false, + synonyms: [ + { + name: 'log.cleanup.policy', + source: ConfigSources.DEFAULT_CONFIG, + value: 'delete' + } + ], + configType: ConfigTypes.LIST, + documentation: + 'This config designates the retention policy to use on log segments. The "delete" policy (which is the default) will discard old segments when their retention time or size limit has been reached. The "compact" policy will enable log compaction, which retains the latest value for each key. It is also possible to specify both policies in a comma-separated list (e.g. "delete,compact"). In this case, old segments will be discarded per the retention time and size configuration, while retained segments will be compacted.' + } + ] + } + ]) + + // Clean up + await admin.deleteTopics({ topics: [topicName] }) +}) + +test('describeConfigs should properly describe broker configs', async t => { + const admin = createAdmin(t) + const topicName = await createTopic(t, true) + + // This test is more about not crashing at all. If any request with Broker resource type + // is sent to the wrong broker, the broker will respond with an error and the test will fail. + const result = await admin.describeConfigs({ + resources: [ + { + resourceType: ConfigResources.BROKER, + resourceName: '1', + configurationKeys: [] + }, + { + resourceType: ConfigResources.BROKER, + resourceName: '2', + configurationKeys: [] + }, + { + resourceType: ConfigResources.BROKER, + resourceName: '3', + configurationKeys: [] + } + ], + includeSynonyms: false, + includeDocumentation: false + }) + + ok(result) + strictEqual(result.length, 3) + strictEqual( + result.every(r => r.resourceType === ConfigResources.BROKER), + true + ) + strictEqual(result[0].resourceName, '1') + strictEqual(Array.isArray(result[0].configs), true) + strictEqual(result[1].resourceName, '2') + strictEqual(Array.isArray(result[1].configs), true) + strictEqual(result[2].resourceName, '3') + strictEqual(Array.isArray(result[2].configs), true) + + // Clean up + await admin.deleteTopics({ topics: [topicName] }) +}) + +test('describeConfigs should support diagnostic channels', async t => { + const admin = createAdmin(t) + const topicName = await createTopic(t, true) + const options = { + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: topicName, + configurationKeys: [] + } + ], + includeSynonyms: false, + includeDocumentation: false + } + const verifyTracingChannel = createTracingChannelVerifier( + adminConfigsChannel, + 'client', + { + start (context: ClientDiagnosticEvent) { + deepStrictEqual(context, { + client: admin, + operation: 'describeConfigs', + options, + operationId: mockedOperationId + }) + }, + asyncStart (context: ClientDiagnosticEvent) { + const result = context.result as ConfigDescription[] + strictEqual(Array.isArray(result), true) + strictEqual(result.length, 1) + }, + error (context: ClientDiagnosticEvent) { + ok(typeof context === 'undefined') + } + }, + (_label: string, data: ClientDiagnosticEvent) => data.operation === 'describeConfigs' + ) + await admin.describeConfigs(options) + verifyTracingChannel() + + // Clean up + await admin.deleteTopics({ topics: [topicName] }) +}) + +test('describeConfigs should validate options in strict mode', async t => { + const admin = createAdmin(t, { strict: true }) + + // Test with missing resources + try { + await admin.describeConfigs({} as any) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('resources'), true) + } + + // Test with invalid additional property + try { + await admin.describeConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test-topic', + configurationKeys: [] + } + ], + invalidProperty: true + } as any) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('must NOT have additional properties'), true) + } + + // Test with invalid type for resources + try { + await admin.describeConfigs({ resources: 'not-an-array' } as any) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('resources'), true) + } + + // Test with invalid type for includeSynonyms + try { + await admin.describeConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test-topic', + configurationKeys: [] + } + ], + includeSynonyms: 'not-a-boolean' + } as any) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('includeSynonyms'), true) + } + + // Test with invalid type for includeDocumentation + try { + await admin.describeConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test-topic', + configurationKeys: [] + } + ], + includeDocumentation: 'not-a-boolean' + } as any) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('includeDocumentation'), true) + } + + // Test with empty resources array + try { + await admin.describeConfigs({ resources: [] }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('resources'), true) + } + + // Test with invalid type for resources elements + try { + await admin.describeConfigs({ + resources: ['not-an-object'] as any + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('resources'), true) + } + + // Test with invalid additional property in resource object + try { + await admin.describeConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test-topic', + configurationKeys: [], + invalidProperty: true + } as any + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('must NOT have additional properties'), true) + } + + // Test with missing resourceType in resource object + try { + await admin.describeConfigs({ + resources: [{ resourceName: 'test-topic', configurationKeys: [] }] as any + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('resourceType'), true) + } + + // Test with missing resourceName in resource object + try { + await admin.describeConfigs({ + resources: [{ resourceType: ConfigResources.TOPIC, configurationKeys: [] }] as any + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('resourceName'), true) + } + + // Test with invalid type for resourceType + try { + await admin.describeConfigs({ + resources: [{ resourceType: true, resourceName: 'test-topic', configurationKeys: [] }] as any + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('resourceType'), true) + } + + // Test with invalid enum member for resourceType + try { + await admin.describeConfigs({ + resources: [{ resourceType: 9999, resourceName: 'test-topic', configurationKeys: [] }] as any + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('resourceType'), true) + } + + // Test with invalid type for resourceName + try { + await admin.describeConfigs({ + resources: [{ resourceType: ConfigResources.TOPIC, resourceName: 123, configurationKeys: [] }] as any + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('resourceName'), true) + } + + // Test with invalid empty resourceName + try { + await admin.describeConfigs({ + resources: [{ resourceType: ConfigResources.TOPIC, resourceName: '', configurationKeys: [] }] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('resourceName'), true) + } + + // Test with invalid type for configurationKeys + try { + await admin.describeConfigs({ + resources: [ + { resourceType: ConfigResources.TOPIC, resourceName: 'test-topic', configurationKeys: 'not-an-array' } + ] as any + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('configurationKeys'), true) + } + + // Test with invalid type for configurationKeys elements + try { + await admin.describeConfigs({ + resources: [{ resourceType: ConfigResources.TOPIC, resourceName: 'test-topic', configurationKeys: [123] }] as any + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('configurationKeys'), true) + } + + // Test with invalid empty configurationKeys elements + try { + await admin.describeConfigs({ + resources: [{ resourceType: ConfigResources.TOPIC, resourceName: 'test-topic', configurationKeys: [''] }] as any + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('configurationKeys'), true) + } +}) + +test('describeConfigs should handle errors from Connection.get', async t => { + const admin = createAdmin(t) + mockConnectionPoolGet(admin[kConnections], 4) + try { + await admin.describeConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test-topic', + configurationKeys: [] + } + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error instanceof MultipleErrors, true) + strictEqual(error.message.includes('Describing configs failed.'), true) + } +}) + +test('describeConfigs should handle errors from the API', async t => { + const admin = createAdmin(t) + mockAPI(admin[kConnections], describeConfigsV4.api.key) + try { + await admin.describeConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test-topic', + configurationKeys: [] + } + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error instanceof MultipleErrors, true) + strictEqual(error.message.includes('Describing configs failed.'), true) + } +}) + +test('describeConfigs should handle unavailable API errors', async t => { + const admin = createAdmin(t) + mockUnavailableAPI(admin, 'DescribeConfigs') + try { + await admin.describeConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test-topic', + configurationKeys: [] + } + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error instanceof MultipleErrors, true) + strictEqual(error.errors[0].message.includes('Unsupported API DescribeConfigs.'), true) + } +}) + +test('alterConfigs should update configs', async t => { + const admin = createAdmin(t) + const topicName = await createTopic(t, true) + await admin.alterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: topicName, + configs: [ + { name: 'cleanup.policy', value: 'compact' }, + { name: 'retention.ms', value: '86400000' } + ] + } + ], + validateOnly: false + }) + + await scheduler.wait(1000) + + const result = await admin.describeConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: topicName, + configurationKeys: ['cleanup.policy', 'retention.ms'] + } + ], + includeSynonyms: false, + includeDocumentation: false + }) + deepStrictEqual(result, [ + { + resourceType: ConfigResources.TOPIC, + resourceName: topicName, + configs: [ + { + name: 'cleanup.policy', + value: 'compact', + readOnly: false, + configSource: ConfigSources.TOPIC_CONFIG, + isSensitive: false, + synonyms: [], + configType: ConfigTypes.LIST, + documentation: null + }, + { + name: 'retention.ms', + value: '86400000', + readOnly: false, + configSource: ConfigSources.TOPIC_CONFIG, + isSensitive: false, + synonyms: [], + configType: ConfigTypes.LONG, + documentation: null + } + ] + } + ]) + + // Clean up + await admin.deleteTopics({ topics: [topicName] }) +}) + +test('alterConfigs should properly update broker configs', async t => { + const admin = createAdmin(t) + const topicName = await createTopic(t, true) + + // This test is more about not crashing at all. If any request with Broker resource type + // is sent to the wrong broker, the broker will respond with an error and the test will fail. + await admin.alterConfigs({ + resources: [ + { + resourceType: ConfigResources.BROKER, + resourceName: '1', + configs: [{ name: 'message.max.bytes', value: '2000000' }] + }, + { + resourceType: ConfigResources.BROKER, + resourceName: '2', + configs: [{ name: 'message.max.bytes', value: '2000000' }] + }, + { + resourceType: ConfigResources.BROKER, + resourceName: '3', + configs: [{ name: 'message.max.bytes', value: '2000000' }] + } + ] + }) + + await scheduler.wait(1000) + + const result = await admin.describeConfigs({ + resources: [ + { + resourceType: ConfigResources.BROKER, + resourceName: '1', + configurationKeys: ['message.max.bytes'] + }, + { + resourceType: ConfigResources.BROKER, + resourceName: '2', + configurationKeys: ['message.max.bytes'] + }, + { + resourceType: ConfigResources.BROKER, + resourceName: '3', + configurationKeys: ['message.max.bytes'] + } + ], + includeSynonyms: false, + includeDocumentation: false + }) + + deepStrictEqual(result, [ + { + resourceType: ConfigResources.BROKER, + resourceName: '1', + configs: [ + { + name: 'message.max.bytes', + value: '2000000', + readOnly: false, + configSource: ConfigSources.DYNAMIC_BROKER_CONFIG, + isSensitive: false, + synonyms: [], + configType: ConfigTypes.INT, + documentation: null + } + ] + }, + { + resourceType: ConfigResources.BROKER, + resourceName: '2', + configs: [ + { + name: 'message.max.bytes', + value: '2000000', + readOnly: false, + configSource: ConfigSources.DYNAMIC_BROKER_CONFIG, + isSensitive: false, + synonyms: [], + configType: ConfigTypes.INT, + documentation: null + } + ] + }, + { + resourceType: ConfigResources.BROKER, + resourceName: '3', + configs: [ + { + name: 'message.max.bytes', + value: '2000000', + readOnly: false, + configSource: ConfigSources.DYNAMIC_BROKER_CONFIG, + isSensitive: false, + synonyms: [], + configType: ConfigTypes.INT, + documentation: null + } + ] + } + ]) + + // Clean up + await admin.deleteTopics({ topics: [topicName] }) +}) + +test('alterConfigs should not update configs when validateOnly is true', async t => { + const admin = createAdmin(t) + const topicName = await createTopic(t, true) + await admin.alterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: topicName, + configs: [ + { name: 'cleanup.policy', value: 'compact' }, + { name: 'retention.ms', value: '86400000' } + ] + } + ], + validateOnly: true + }) + + await scheduler.wait(1000) + + const result = await admin.describeConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: topicName, + configurationKeys: ['cleanup.policy', 'retention.ms'] + } + ], + includeSynonyms: false, + includeDocumentation: false + }) + deepStrictEqual(result, [ + { + resourceType: ConfigResources.TOPIC, + resourceName: topicName, + configs: [ + { + name: 'cleanup.policy', + value: 'delete', + readOnly: false, + configSource: ConfigSources.DEFAULT_CONFIG, + isSensitive: false, + synonyms: [], + configType: ConfigTypes.LIST, + documentation: null + }, + { + name: 'retention.ms', + value: '604800000', + readOnly: false, + configSource: ConfigSources.DEFAULT_CONFIG, + isSensitive: false, + synonyms: [], + configType: ConfigTypes.LONG, + documentation: null + } + ] + } + ]) + + // Clean up + await admin.deleteTopics({ topics: [topicName] }) +}) + +test('alterConfigs should support diagnostic channels', async t => { + const admin = createAdmin(t) + const topicName = await createTopic(t, true) + const options = { + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: topicName, + configs: [ + { name: 'cleanup.policy', value: 'compact' }, + { name: 'retention.ms', value: '86400000' } + ] + } + ], + validateOnly: false + } + const verifyTracingChannel = createTracingChannelVerifier( + adminConfigsChannel, + 'client', + { + start (context: ClientDiagnosticEvent) { + deepStrictEqual(context, { + client: admin, + operation: 'alterConfigs', + options, + operationId: mockedOperationId + }) + }, + asyncStart (context: ClientDiagnosticEvent) { + strictEqual(context.result, undefined) + }, + error (context: ClientDiagnosticEvent) { + ok(typeof context === 'undefined') + } + }, + (_label: string, data: ClientDiagnosticEvent) => data.operation === 'alterConfigs' + ) + await admin.alterConfigs(options) + verifyTracingChannel() + + // Clean up + await admin.deleteTopics({ topics: [topicName] }) +}) + +test('alterConfigs should validate options in strict mode', async t => { + const admin = createAdmin(t, { strict: true }) + + // Test with missing resources + try { + await admin.alterConfigs({} as any) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('resources'), true) + } + + // Test with invalid additional property + try { + await admin.alterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test-topic', + configs: [{ name: 'cleanup.policy', value: 'compact' }] + } + ], + invalidProperty: true + } as any) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('must NOT have additional properties'), true) + } + + // Test with invalid type for resources + try { + await admin.alterConfigs({ resources: 'not-an-array' } as any) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('resources'), true) + } + + // Test with invalid type for validateOnly + try { + await admin.alterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test-topic', + configs: [{ name: 'cleanup.policy', value: 'compact' }] + } + ], + validateOnly: 'not-a-boolean' + } as any) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('validateOnly'), true) + } + + // Test with empty resources array + try { + await admin.alterConfigs({ resources: [] }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('resources'), true) + } + + // Test with invalid type for resources elements + try { + await admin.alterConfigs({ + resources: ['not-an-object'] as any + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('resources'), true) + } + + // Test with invalid additional property in resource object + try { + await admin.alterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test-topic', + configs: [{ name: 'cleanup.policy', value: 'compact' }], + invalidProperty: true + } as any + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('must NOT have additional properties'), true) + } + + // Test with missing resourceType in resource object + try { + await admin.alterConfigs({ + resources: [{ resourceName: 'test-topic', configs: [{ name: 'cleanup.policy', value: 'compact' }] }] as any + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('resourceType'), true) + } + + // Test with missing resourceName in resource object + try { + await admin.alterConfigs({ + resources: [ + { resourceType: ConfigResources.TOPIC, configs: [{ name: 'cleanup.policy', value: 'compact' }] } + ] as any + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('resourceName'), true) + } + + // Test with missing configs in resource object + try { + await admin.alterConfigs({ + resources: [{ resourceType: ConfigResources.TOPIC, resourceName: 'test-topic' }] as any + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('configs'), true) + } + + // Test with invalid type for resourceType + try { + await admin.alterConfigs({ + resources: [ + { resourceType: true, resourceName: 'test-topic', configs: [{ name: 'cleanup.policy', value: 'compact' }] } + ] as any + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('resourceType'), true) + } + + // Test with invalid enum member for resourceType + try { + await admin.alterConfigs({ + resources: [ + { resourceType: 9999, resourceName: 'test-topic', configs: [{ name: 'cleanup.policy', value: 'compact' }] } + ] as any + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('resourceType'), true) + } + + // Test with invalid type for resourceName + try { + await admin.alterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 123, + configs: [{ name: 'cleanup.policy', value: 'compact' }] + } + ] as any + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('resourceName'), true) + } + + // Test with invalid empty resourceName + try { + await admin.alterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: '', + configs: [{ name: 'cleanup.policy', value: 'compact' }] + } + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('resourceName'), true) + } + + // Test with invalid type for configs + try { + await admin.alterConfigs({ + resources: [{ resourceType: ConfigResources.TOPIC, resourceName: 'test-topic', configs: 'not-an-array' }] as any + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('configs'), true) + } + + // Test with invalid empty configs array + try { + await admin.alterConfigs({ + resources: [{ resourceType: ConfigResources.TOPIC, resourceName: 'test-topic', configs: [] }] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('configs'), true) + } + + // Test with invalid type for configs elements + try { + await admin.alterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test', + configs: ['not-an-object'] as any + } + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('configs'), true) + } + + // Test with missing name in config object + try { + await admin.alterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test', + configs: [{ value: 'compact' }] as any + } + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('name'), true) + } + + // Test with invalid additional property in config object + try { + await admin.alterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test', + configs: [{ name: 'cleanup.policy', value: 'compact', invalidProperty: true } as any] + } + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('must NOT have additional properties'), true) + } + + // Test with invalid type for name in config object + try { + await admin.alterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test', + configs: [{ name: 123, value: 'compact' }] as any + } + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('name'), true) + } + + // Test with invalid empty name in config object + try { + await admin.alterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test', + configs: [{ name: '', value: 'compact' }] + } + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('name'), true) + } + + // Test with invalid type for value in config object + try { + await admin.alterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test', + configs: [{ name: 'cleanup.policy', value: 123 }] as any + } + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('value'), true) + } +}) + +test('alterConfigs should handle errors from Connection.get', async t => { + const admin = createAdmin(t) + mockConnectionPoolGet(admin[kConnections], 4) + try { + await admin.alterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test-topic', + configs: [{ name: 'cleanup.policy', value: 'compact' }] + } + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error instanceof MultipleErrors, true) + strictEqual(error.message.includes('Altering configs failed.'), true) + } +}) + +test('alterConfigs should handle errors from the API', async t => { + const admin = createAdmin(t) + mockAPI(admin[kConnections], alterConfigsV2.api.key) + try { + await admin.alterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test-topic', + configs: [{ name: 'cleanup.policy', value: 'compact' }] + } + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error instanceof MultipleErrors, true) + strictEqual(error.message.includes('Altering configs failed.'), true) + } +}) + +test('alterConfigs should handle unavailable API errors', async t => { + const admin = createAdmin(t) + mockUnavailableAPI(admin, 'AlterConfigs') + try { + await admin.alterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test-topic', + configs: [{ name: 'cleanup.policy', value: 'compact' }] + } + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error instanceof MultipleErrors, true) + strictEqual(error.errors[0].message.includes('Unsupported API AlterConfigs.'), true) + } +}) + +test('incrementalAlterConfigs should be able to set, append, subtract, and delete configs', async t => { + const admin = createAdmin(t) + const topicName = await createTopic(t, true) + await admin.incrementalAlterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: topicName, + configs: [ + { + name: 'cleanup.policy', + configOperation: IncrementalAlterConfigOperationTypes.SET, + value: 'compact' + } + ] + } + ], + validateOnly: false + }) + await scheduler.wait(1000) + const setResult = await admin.describeConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: topicName, + configurationKeys: ['cleanup.policy'] + } + ], + includeSynonyms: false, + includeDocumentation: false + }) + deepStrictEqual(setResult, [ + { + resourceType: ConfigResources.TOPIC, + resourceName: topicName, + configs: [ + { + name: 'cleanup.policy', + value: 'compact', + readOnly: false, + configSource: ConfigSources.TOPIC_CONFIG, + isSensitive: false, + synonyms: [], + configType: ConfigTypes.LIST, + documentation: null + } + ] + } + ]) + await admin.incrementalAlterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: topicName, + configs: [ + { + name: 'cleanup.policy', + configOperation: IncrementalAlterConfigOperationTypes.APPEND, + value: 'delete' + } + ] + } + ], + validateOnly: false + }) + await scheduler.wait(1000) + const appendResult = await admin.describeConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: topicName, + configurationKeys: ['cleanup.policy'] + } + ], + includeSynonyms: false, + includeDocumentation: false + }) + deepStrictEqual(appendResult, [ + { + resourceType: ConfigResources.TOPIC, + resourceName: topicName, + configs: [ + { + name: 'cleanup.policy', + value: 'compact,delete', + readOnly: false, + configSource: ConfigSources.TOPIC_CONFIG, + isSensitive: false, + synonyms: [], + configType: ConfigTypes.LIST, + documentation: null + } + ] + } + ]) + await admin.incrementalAlterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: topicName, + configs: [ + { + name: 'cleanup.policy', + configOperation: IncrementalAlterConfigOperationTypes.SUBTRACT, + value: 'compact' + } + ] + } + ], + validateOnly: false + }) + await scheduler.wait(1000) + const subtractResult = await admin.describeConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: topicName, + configurationKeys: ['cleanup.policy'] + } + ], + includeSynonyms: false, + includeDocumentation: false + }) + deepStrictEqual(subtractResult, [ + { + resourceType: ConfigResources.TOPIC, + resourceName: topicName, + configs: [ + { + name: 'cleanup.policy', + value: 'delete', + readOnly: false, + configSource: ConfigSources.TOPIC_CONFIG, + isSensitive: false, + synonyms: [], + configType: ConfigTypes.LIST, + documentation: null + } + ] + } + ]) + await admin.incrementalAlterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: topicName, + configs: [ + { + name: 'cleanup.policy', + configOperation: IncrementalAlterConfigOperationTypes.DELETE + } + ] + } + ], + validateOnly: false + }) + await scheduler.wait(1000) + const deleteResult = await admin.describeConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: topicName, + configurationKeys: ['cleanup.policy'] + } + ], + includeSynonyms: false, + includeDocumentation: false + }) + deepStrictEqual(deleteResult, [ + { + resourceType: ConfigResources.TOPIC, + resourceName: topicName, + configs: [ + { + name: 'cleanup.policy', + value: 'delete', + readOnly: false, + // The config has been deleted from topic config, so it falls back to default + configSource: ConfigSources.DEFAULT_CONFIG, + isSensitive: false, + synonyms: [], + configType: ConfigTypes.LIST, + documentation: null + } + ] + } + ]) + + // Clean up + await admin.deleteTopics({ topics: [topicName] }) +}) + +test('incrementalAlterConfigs should properly update broker configs', async t => { + const admin = createAdmin(t) + const topicName = await createTopic(t, true) + + await admin.incrementalAlterConfigs({ + resources: [ + { + resourceType: ConfigResources.BROKER, + resourceName: '1', + configs: [ + { + name: 'message.max.bytes', + configOperation: IncrementalAlterConfigOperationTypes.SET, + value: '2500000' + } + ] + }, + { + resourceType: ConfigResources.BROKER, + resourceName: '2', + configs: [ + { + name: 'message.max.bytes', + configOperation: IncrementalAlterConfigOperationTypes.SET, + value: '2500000' + } + ] + }, + { + resourceType: ConfigResources.BROKER, + resourceName: '3', + configs: [ + { + name: 'message.max.bytes', + configOperation: IncrementalAlterConfigOperationTypes.SET, + value: '2500000' + } + ] + } + ] + }) + + await scheduler.wait(1000) + + const result = await admin.describeConfigs({ + resources: [ + { + resourceType: ConfigResources.BROKER, + resourceName: '1', + configurationKeys: ['message.max.bytes'] + }, + { + resourceType: ConfigResources.BROKER, + resourceName: '2', + configurationKeys: ['message.max.bytes'] + }, + { + resourceType: ConfigResources.BROKER, + resourceName: '3', + configurationKeys: ['message.max.bytes'] + } + ], + includeSynonyms: false, + includeDocumentation: false + }) + + deepStrictEqual(result, [ + { + resourceType: ConfigResources.BROKER, + resourceName: '1', + configs: [ + { + name: 'message.max.bytes', + value: '2500000', + readOnly: false, + configSource: ConfigSources.DYNAMIC_BROKER_CONFIG, + isSensitive: false, + synonyms: [], + configType: ConfigTypes.INT, + documentation: null + } + ] + }, + { + resourceType: ConfigResources.BROKER, + resourceName: '2', + configs: [ + { + name: 'message.max.bytes', + value: '2500000', + readOnly: false, + configSource: ConfigSources.DYNAMIC_BROKER_CONFIG, + isSensitive: false, + synonyms: [], + configType: ConfigTypes.INT, + documentation: null + } + ] + }, + { + resourceType: ConfigResources.BROKER, + resourceName: '3', + configs: [ + { + name: 'message.max.bytes', + value: '2500000', + readOnly: false, + configSource: ConfigSources.DYNAMIC_BROKER_CONFIG, + isSensitive: false, + synonyms: [], + configType: ConfigTypes.INT, + documentation: null + } + ] + } + ]) + + // Clean up + await admin.deleteTopics({ topics: [topicName] }) +}) + +test('incrementalAlterConfigs should not update configs when validateOnly is true', async t => { + const admin = createAdmin(t) + const topicName = await createTopic(t, true) + await admin.incrementalAlterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: topicName, + configs: [ + { + name: 'cleanup.policy', + configOperation: IncrementalAlterConfigOperationTypes.SET, + value: 'compact' + } + ] + } + ], + validateOnly: true + }) + await scheduler.wait(200) + const result = await admin.describeConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: topicName, + configurationKeys: ['cleanup.policy'] + } + ], + includeSynonyms: false, + includeDocumentation: false + }) + deepStrictEqual(result, [ + { + resourceType: ConfigResources.TOPIC, + resourceName: topicName, + configs: [ + { + name: 'cleanup.policy', + value: 'delete', + readOnly: false, + configSource: ConfigSources.DEFAULT_CONFIG, + isSensitive: false, + synonyms: [], + configType: ConfigTypes.LIST, + documentation: null + } + ] + } + ]) + + // Clean up + await admin.deleteTopics({ topics: [topicName] }) +}) + +test('incrementalAlterConfigs should support diagnostic channels', async t => { + const admin = createAdmin(t) + const topicName = await createTopic(t, true) + const options = { + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: topicName, + configs: [ + { + name: 'cleanup.policy', + configOperation: IncrementalAlterConfigOperationTypes.SET, + value: 'compact' + } + ] + } + ], + validateOnly: false + } + const verifyTracingChannel = createTracingChannelVerifier( + adminConfigsChannel, + 'client', + { + start (context: ClientDiagnosticEvent) { + deepStrictEqual(context, { + client: admin, + operation: 'incrementalAlterConfigs', + options, + operationId: mockedOperationId + }) + }, + asyncStart (context: ClientDiagnosticEvent) { + strictEqual(context.result, undefined) + }, + error (context: ClientDiagnosticEvent) { + ok(typeof context === 'undefined') + } + }, + (_label: string, data: ClientDiagnosticEvent) => data.operation === 'incrementalAlterConfigs' + ) + await admin.incrementalAlterConfigs(options) + verifyTracingChannel() + + // Clean up + await admin.deleteTopics({ topics: [topicName] }) +}) + +test('incrementalAlterConfigs should validate options in strict mode', async t => { + const admin = createAdmin(t, { strict: true }) + + // Test with missing resources + try { + await admin.incrementalAlterConfigs({} as any) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('resources'), true) + } + + // Test with invalid additional property + try { + await admin.incrementalAlterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test-topic', + configs: [ + { + name: 'cleanup.policy', + configOperation: IncrementalAlterConfigOperationTypes.SET, + value: 'compact' + } + ] + } + ], + invalidProperty: true + } as any) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('must NOT have additional properties'), true) + } + + // Test with invalid type for resources + try { + await admin.incrementalAlterConfigs({ resources: 'not-an-array' } as any) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('resources'), true) + } + + // Test with invalid type for validateOnly + try { + await admin.incrementalAlterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test-topic', + configs: [ + { + name: 'cleanup.policy', + configOperation: IncrementalAlterConfigOperationTypes.SET, + value: 'compact' + } + ] + } + ], + validateOnly: 'not-a-boolean' + } as any) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('validateOnly'), true) + } + + // Test with empty resources array + try { + await admin.incrementalAlterConfigs({ resources: [] }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('resources'), true) + } + + // Test with invalid type for resources elements + try { + await admin.incrementalAlterConfigs({ + resources: ['not-an-object'] as any + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('resources'), true) + } + + // Test with invalid additional property in resource object + try { + await admin.incrementalAlterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test-topic', + configs: [ + { + name: 'cleanup.policy', + configOperation: IncrementalAlterConfigOperationTypes.SET, + value: 'compact' + } + ], + invalidProperty: true + } as any + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('must NOT have additional properties'), true) + } + + // Test with missing resourceType in resource object + try { + await admin.incrementalAlterConfigs({ + resources: [{ resourceName: 'test-topic', configs: [] }] as any + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('resourceType'), true) + } + + // Test with missing resourceName in resource object + try { + await admin.incrementalAlterConfigs({ + resources: [{ resourceType: ConfigResources.TOPIC, configs: [] }] as any + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('resourceName'), true) + } + + // Test with missing configs in resource object + try { + await admin.incrementalAlterConfigs({ + resources: [{ resourceType: ConfigResources.TOPIC, resourceName: 'test-topic' }] as any + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('configs'), true) + } + + // Test with invalid type for resourceType + try { + await admin.incrementalAlterConfigs({ + resources: [{ resourceType: true, resourceName: 'test-topic', configs: [] }] as any + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('resourceType'), true) + } + + // Test with invalid enum member for resourceType + try { + await admin.incrementalAlterConfigs({ + resources: [{ resourceType: 9999, resourceName: 'test-topic', configs: [] }] as any + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('resourceType'), true) + } + + // Test with invalid type for resourceName + try { + await admin.incrementalAlterConfigs({ + resources: [{ resourceType: ConfigResources.TOPIC, resourceName: 123, configs: [] }] as any + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('resourceName'), true) + } + + // Test with invalid empty resourceName + try { + await admin.incrementalAlterConfigs({ + resources: [{ resourceType: ConfigResources.TOPIC, resourceName: '', configs: [] }] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('resourceName'), true) + } + + // Test with invalid type for configs + try { + await admin.incrementalAlterConfigs({ + resources: [{ resourceType: ConfigResources.TOPIC, resourceName: 'test-topic', configs: 'not-an-array' }] as any + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('configs'), true) + } + + // Test with invalid empty configs array + try { + await admin.incrementalAlterConfigs({ + resources: [{ resourceType: ConfigResources.TOPIC, resourceName: 'test-topic', configs: [] }] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('configs'), true) + } + + // Test with invalid type for configs elements + try { + await admin.incrementalAlterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test', + configs: ['not-an-object'] as any + } + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('configs'), true) + } + + // Test with invalid additional property in config object + try { + await admin.incrementalAlterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test', + configs: [ + { + name: 'cleanup.policy', + configOperation: IncrementalAlterConfigOperationTypes.SET, + value: 'compact', + invalidProperty: true + } as any + ] + } + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('must NOT have additional properties'), true) + } + + // Test with missing name in config object + try { + await admin.incrementalAlterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test', + configs: [ + { + configOperation: IncrementalAlterConfigOperationTypes.SET, + value: 'compact' + } as any + ] + } + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('name'), true) + } + + // Test with missing configOperation in config object + try { + await admin.incrementalAlterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test', + configs: [ + { + name: 'cleanup.policy', + value: 'compact' + } as any + ] + } + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('configOperation'), true) + } + + // Test with missing value in config object when operation is not DELETE + try { + await admin.incrementalAlterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test', + configs: [ + { + name: 'cleanup.policy', + configOperation: IncrementalAlterConfigOperationTypes.SET + } as any + ] + } + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('value'), true) + } + + // Test with invalid type for name in config object + try { + await admin.incrementalAlterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test', + configs: [ + { + name: 123, + configOperation: IncrementalAlterConfigOperationTypes.SET, + value: 'test' + } as any + ] + } + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('name'), true) + } + + // Test with invalid empty name in config object + try { + await admin.incrementalAlterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test', + configs: [ + { + name: '', + configOperation: IncrementalAlterConfigOperationTypes.SET, + value: 'test' + } + ] + } + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('name'), true) + } + + // Test with invalid type for configOperation + try { + await admin.incrementalAlterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test', + configs: [ + { + name: 'test.config', + configOperation: 'not-a-number' as any, + value: 'test' + } + ] + } + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('configOperation'), true) + } + + // Test with invalid enum member for configOperation + try { + await admin.incrementalAlterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test', + configs: [ + { + name: 'test.config', + configOperation: 9999 as any, + value: 'test' + } + ] + } + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('configOperation'), true) + } + + // Test with invalid type for value in config object when operation is not DELETE + try { + await admin.incrementalAlterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test', + configs: [ + { + name: 'test.config', + configOperation: IncrementalAlterConfigOperationTypes.SET, + value: 123 as any + } + ] + } + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('value'), true) + } + + // Test with invalid value provided for DELETE operation + try { + await admin.incrementalAlterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test', + configs: [ + { + name: 'test.config', + configOperation: IncrementalAlterConfigOperationTypes.DELETE, + // @ts-expect-error - Intentionally passing invalid options + value: 'should-not-be-provided' + } + ] + } + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('value'), true) + } +}) + +test('incrementalAlterConfigs should handle errors from Connection.get', async t => { + const admin = createAdmin(t) + mockConnectionPoolGet(admin[kConnections], 4) + try { + await admin.incrementalAlterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test-topic', + configs: [ + { + name: 'cleanup.policy', + configOperation: IncrementalAlterConfigOperationTypes.SET, + value: 'compact' + } + ] + } + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error instanceof MultipleErrors, true) + strictEqual(error.message.includes('Incrementally altering configs failed.'), true) + } +}) + +test('incrementalAlterConfigs should handle errors from the API', async t => { + const admin = createAdmin(t) + mockAPI(admin[kConnections], incrementalAlterConfigsV1.api.key) + try { + await admin.incrementalAlterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test-topic', + configs: [ + { + name: 'cleanup.policy', + configOperation: IncrementalAlterConfigOperationTypes.SET, + value: 'compact' + } + ] + } + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error instanceof MultipleErrors, true) + strictEqual(error.message.includes('Incrementally altering configs failed.'), true) + } +}) + +test('incrementalAlterConfigs should handle unavailable API errors', async t => { + const admin = createAdmin(t) + mockUnavailableAPI(admin, 'IncrementalAlterConfigs') + try { + await admin.incrementalAlterConfigs({ + resources: [ + { + resourceType: ConfigResources.TOPIC, + resourceName: 'test-topic', + configs: [ + { + name: 'cleanup.policy', + configOperation: IncrementalAlterConfigOperationTypes.SET, + value: 'compact' + } + ] + } + ] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error instanceof MultipleErrors, true) + strictEqual(error.errors[0].message.includes('Unsupported API IncrementalAlterConfigs.'), true) + } +})