构建可观察的动态命令行程序

本文讨论了构建可观察的动态命令行程序的一些实践。

需求

有些时候我们会开发出具有如下特征的命令行程序:

* 需要运行较长时间(几分钟至数小时,甚至数天),并有可能会放在持续集成系统中运行(例如Jenkins)
* 参数较多,且可以灵活配置

这种命令行程序在自动化测试领域很常见,例如用来批量生成某些数据、批量运行自动化测试用例等。这些程序的运行方式一般会从开发者在本地运行慢慢变成在持续集成系统中运行,从开发者主动触发运行慢慢变成定时地或被某些上游系统以调用持续集成系统API的方式自动运行。

动态修改命令行程序的参数

一般来说,一旦一个命令行程序开始运行,此时如果用户想修改某些参数,那就必须停掉正在运行的程序,修改参数后再重新运行这个程序。这在大部分情况下是没问题的,但如果用户希望在不停止程序运行的情况下动态地修改参数怎么办呢?有一些情形下我们希望具备这种能力,这里列举几个:

1. 一个较大的自动化测试用例集已经运行了一段时间,此时用户发现原来的用例集不全,还需要再加入一些新用例。
2. 一个数据准备程序已经运行了一段时间,此时用户希望加入一些新数据。
3. 一个会向后端组件发送大量请求的命令行程序,刚开始时由于后端资源紧张并发数很低,之后后端资源充裕了想要提高并发数。

上述情况下,最直接的做法肯定是停掉正在运行的程序重新运行,但这么做也是有成本的,需要花费更多时间重启程序、或者在最后需要聚合多份自动化测试报告。重启程序的成本根据实际情况或高或低,不过无论如何,成本确实存在。

监控命令行程序

还有一种情形是用户希望看到长时间运行的命令行程序的状态,比如进度、成功率、并发数等信息。这些信息是动态的,传统方式下一般通过输出日志来实现。那还有没有其他的方式呢?能否把这些数据集成到 dashboard 里进行监控?


什么是可观察的动态命令行程序?

可观察的命令行程序

有了上面的需求,很自然地会去思考如何解决。先来看观察命令行程序的可能方式:

1. 输出日志/文件
2. 将数据写到外部系统(数据库/信息收集系统)
3. 在本地开放 HTTP API

1. 输出日志/文件

这是最简单直接的方式,优点是一目了然,方便保留过程日志,缺点是不利于提供即时信息,对监控和集成不友好。

2. 将数据写到外部系统(数据库/信息收集系统)

这种方式的优点是既可以保留过程日志也可以提供实时信息,缺点是开销较大,而且引入了外部依赖。

3. 在本地开放 HTTP API

这种方式会在命令行运行的本地启动一个 HTTP Server,对外提供 HTTP API。优点是方便获取及时信息,对监控和集成友好,缺点是不利于保留过程日志。

如何选择?

我们会比较希望既可以保留过程日志又可以监控即时信息,那么可以组合使用1和3,或者2和3。

动态

再来看可动态这个特性。一般来说,动态意味着可在运行时修改行为。对于命令行程序来说,有几种可能的方式:

1. 通过一个配置文件,命令行程序定期去读,用来更新内部数据
2. 监听消息队列
3. 通过本地开放的 HTTP API

定期读配置文件

虽然可以达到目的,但不太方便,尤其在 Jenkins 上执行时,用户需要远程连接到执行机来修改配置。很多情况下用户是没有权限这样做的。

监听消息队列

监听消息队列需要引入外部依赖,成本较高。

通过本地开放的 HTTP API

这种方式适用于本地和远程执行,也可以和上面提到的监控即时信息的做法相契合。

如何选择?

综合来看,通过在本地开放 HTTP API 来提供可观察的即时数据和动态修改程序行为是一种比较靠谱的方式。


实现

程序入口

通过在本地开放 HTTP API 来构建可观察的动态命令行程序有很多方式,这里我给出一个我觉得比较优雅的实现方式以供参考。

首先要说明的是,实现这个想法本身是框架无关的,可以基于任何框架去实现,甚至不用框架。不过既然有合适的工具,我们没理由置之不理,因此下面讨论的实现还是使用了框架来做。先简要介绍下这种方式,我使用了 nest.js 这个框架,这是个类 Spring 的 TS/JS 框架,内建了依赖注入支持,还有不少官方和第三方的优质插件。

我创建了一个示例工程 any-factory 来展示如何实现。这个示例展示了一个工厂,我们可以通过命令行指定需要生成哪些产品,以及它们的个数。另外还提供了可选的 HTTP Server 来实时获取工厂内部数据和动态修改待生产的产品参数的能力。

先来看入口文件 main.ts:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// src/main.ts
import { NestFactory } from '@nestjs/core';
import { ConsoleLogger } from '@nestjs/common';
import { BootstrapConsole } from 'nestjs-console';
import { DocumentBuilder, SwaggerModule } from '@nestjs/swagger';
import { AppModule } from './app.module';
import { ExceptionInterceptor } from '@infrastructure/interceptors/exception.interceptor';

export class CustomBootstrapConsole extends BootstrapConsole {
async create() {
const app = await NestFactory.create(AppModule);

const appLogger = new ConsoleLogger('Any Factory');
const config = new DocumentBuilder()
.setTitle('Any Factory')
.setDescription('The any factory web API description')
.setVersion('1.0')
.addTag('Any Factory')
.build();
const document = SwaggerModule.createDocument(app, config);
SwaggerModule.setup('docs', app, document);

app.useGlobalInterceptors(new ExceptionInterceptor());

app.enableShutdownHooks();

app.getHttpServer().on('listening', () => {
const port = app.getHttpServer().address().port;
appLogger.log(`Application is listening on port ${port}`);
});

await app.listen(0);

return app;
}
}

const withServer = (): boolean => {
return !!process.argv.find((e) => e === '--with-server');
};

const getBootstrap = (): BootstrapConsole => {
if (withServer()) {
return new CustomBootstrapConsole({
module: AppModule,
withContainer: true, // This is the key that will give you access to the app container from the service cli
useDecorators: true,
contextOptions: { logger: false },
} as any);
}
return new BootstrapConsole({
module: AppModule,
useDecorators: true,
});
};

const bootstrap = getBootstrap();

bootstrap
.init()
.then(async (app) => {
await app.init();
await bootstrap.boot();
})
.catch((e) => {
process.exit(1);
});

入口文件有几个值得一提的地方,首先是使用了 nestjs-console 这个 npm package,由于 nest.js 默认不提供命令行的使用方式,因此我们需要这个库来提供命令行的支持。我们实现了一个 CustomBootstrapConsole 类,该类在 create 方法里创建了一个 nest.js HTTP server,并内部配置了一个 swagger module,最后监听在0端口。这里有个小技巧,如果监听0端口,则操作系统会随机分配一个可用的端口号。这么做的好处是我们不需要显式指定监听的端口号,可以在机器上同时运行多个该程序而不用担心端口号冲突。

withServer 方法用来判断是否需要启动一个 HTTP server:当命令行参数中存在 --with-server 选项时启动一个 HTTP server。

getBootstrap 方法作为 main.ts 的入口方法,负责判断命令行程序的启动方式:带 HTTP Server 或者不带。注意看 getBootstrap 的实现,当 withServer() 为 true 时,我们创建了一个 CustomBootstrapConsole 实例,参数 withContainer: true 非常重要,它允许我们从 CLI 中访问 app container,在这里就是 module: AppModule。这个特性非常重要,它允许 CLI 和 我们的应用程序通信。这里我们倒不需要去追究它的实现方式。接着就是初始化了,这部分 nest.js 框架的方法会帮我们处理。

有一幅图帮助理解这个项目的基本结构:

any-factory的基本结构

接着看 app.module.ts 里的 AppModule

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// src/app.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule } from 'nestjs-config';
import * as path from 'path';
import { MonitorModule } from '@modules/monitor/monitor.module';
import { ProductionModule } from '@modules/production/production.module';

@Module({
imports: [
ConfigModule.load(path.resolve(__dirname, 'config', '**/!(*.d).{ts,js}')),
MonitorModule,
ProductionModule,
],
controllers: [],
providers: [],
})
export class AppModule {}

抛开 nest.js 具体的使用细节不谈(尽管这也很重要,但目前它们对我们来说是手段而不是目的,故先避开),在 AppModule 中我们导入了3个 module: ProductionModule, MonitorModuleConfigModuleProductionModule 是这个项目的核心上下文,负责生产产品。MonitorModule 负责提供可观察、动态修改命令行程序的入口。ConfigModule 负责配置处理文件。为了保持示例的简单,这里只导入3个modules,但已足够说明问题了。

生产模块

接着来看 ProductionModule:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// src/modules/production/production.module.ts
import { Module, Global } from '@nestjs/common';
import { CqrsModule } from '@nestjs/cqrs';
import { ConsoleModule } from 'nestjs-console';
import { ProduceProductCliController } from './commands/produce-product/produce-product.cli.controller';
import { ProduceProductService } from './commands/produce-product/produce-product.service';
import { UpdateSchedulerService } from '../monitor/commands/update-scheduler/update-scheduler.service';
import { productionServiceLoggerProvider } from './providers/production.providers';

const cliControllers = [ProduceProductCliController];

const commandHandlers = [ProduceProductService, UpdateSchedulerService];

const customProviders = [productionServiceLoggerProvider];

@Global()
@Module({
imports: [CqrsModule, ConsoleModule],
controllers: [],
providers: [...cliControllers, ...commandHandlers, ...customProviders],
exports: [...commandHandlers],
})
export class ProductionModule {}

production.module.ts 这种模块文件本身没有任何逻辑,它只是一个模块的入口,负责组装模块。imports 字段指明该模块依赖两个外部模块 CqrsModuleConsoleModule,稍后会解释其含义。controllers 字段本来是用来声明需要注入的 HTTP controllers,但在这里 ProductionModule 并不直接提供 HTTP API,所以是个空数组。我们稍后会看到这个字段在 MonitorModule 是有用的。在 providers 字段中声明了一些东西:

  • cliControllers: 命令行相关的 controllers,即用户和应用程序核心之间的命令行控制器。
  • commandHandlers: 负责执行命令行控制器接收到的命令,cliControllers 是 commandHandlers 的直接用户。
  • customProviders: 一些自定义 providers,这里只有一个 logger。

接着看 produce-product.cli.controller.ts:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// src/modules/production/commands/produce-product/produce-product.cli.controller.ts
import { Injectable } from '@nestjs/common';
import { CommandBus } from '@nestjs/cqrs';
import { Command, Console } from 'nestjs-console';
import { ProduceProductCommand } from './produce-product.command';
import { Production } from '@src/interface-adapters/interfaces/production/production.interface';

@Console()
@Injectable()
export class ProduceProductCliController {
constructor(private readonly commandBus: CommandBus) {}

@Command({
command: 'produce-products',
description: 'Produce products',
options: [
{
flags: '-s, --specs <specs>',
required: true,
fn: (value) => value.split(';'),
description: 'Product specs',
},
{
flags: '-c, --concurrency <concurrency>',
required: false,
defaultValue: 1,
fn: (value) => parseInt(value),
description: 'Concurrency of pipeline',
},
{
flags: '--with-server',
required: false,
description:
'Will start a HTTP server to provide a way to inspect some internal data if specified',
},
],
})
async produceProducts(opts: Production): Promise<void> {
const command = new ProduceProductCommand(opts);
await this.commandBus.execute(command);
}
}

@Console() 装饰器声明这是一个命令行控制器,另外在构造参数中有一个 CommandBus,这个东西来自 @nestjs/cqrs 这个 package,CQRS 表示命令查询职责分离,也就是我们将读操作和写操作分开处理,这在微服务和领域驱动设计(DDD)的实践中有非常多的应用,由于这次的主题并不是关于 CQRS 的,这里只是顺带一提。简单来说在 CQRS 将写操作视为一种命令(command),这里的 CommandBus 提供了一种执行命令的一致性方法。在 produceProducts 方法中,我们使用 @Command() 装饰器声明了一个命令,用来生产产品,里面的参数简单易懂,这里就不细说了。继续看这个方法的实现,我们使用方法入参 opt 实例化了一个 ProduceProductCommand 命令对象,这个对象其实只是一个命令信息的载体而已,没什么特别的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// src/modules/production/commands/produce-product/produce-product.command.ts
import {
Command,
CommandProps,
} from '@src/libs/ddd/domain/base-classes/command.base';

export class ProduceProductCommand extends Command {
constructor(props: CommandProps<ProduceProductCommand>) {
super(props);
this.specs = props.specs;
this.concurrency = props.concurrency;
}

readonly specs: string[];

readonly concurrency: number;
}

真正有意思的地方是:

1
await this.commandBus.execute(command);

这就是上面提到的 执行命令的一致性方法。它的神奇之处在于在 produceProducts 方法中,我们不需要了解谁会负责处理这个 ProduceProductCommand 命令对象,我们只需要知道把这个命令对象传给 this.commandBusexecute 方法就行了。

那谁会来处理 ProduceProductCommand 命令对象呢?它是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
// src/modules/production/commands/produce-product/produce-product.service.ts
import { Injectable, Inject } from '@nestjs/common';
import { CommandBus, CommandHandler } from '@nestjs/cqrs';
import { ConfigService } from 'nestjs-config';
import { Result } from '@libs/ddd/domain/utils/result.util';
import { Logger } from '@libs/ddd/domain/ports/logger.port';
import { CommandHandlerBase } from '@src/libs/ddd/domain/base-classes/command-handler.base';
import { ProduceProductCommand } from './produce-product.command';
import { PipelineEntity } from '@modules/production/domain/entities/pipeline.entity';
import { Production } from '@modules/production/domain/value-objects/production.value-object';
import { Spec } from '@modules/production/domain/value-objects/spec.value-object';
import { Concurrency } from '@modules/production/domain/value-objects/concurrency.value-object';
import { Summary } from '@modules/production/domain/value-objects/summary.value-object';
import { produceProductServiceLoggerSymbol } from '@modules/production/providers/production.providers';

@Injectable()
@CommandHandler(ProduceProductCommand)
export class ProduceProductService extends CommandHandlerBase {
private pipelineEntity: PipelineEntity;
private isRunning = false;

constructor(
private readonly commandBus: CommandBus,
@Inject(produceProductServiceLoggerSymbol)
private readonly logger: Logger,
private readonly config: ConfigService,
) {
super();
}

async handle(
command: ProduceProductCommand,
): Promise<Result<boolean, Error>> {
const specServer = this.config.get('app.specServer');
const production = new Production({
specs: command.specs.map((spec) => {
const [name, count] = spec.split(':');
return new Spec({
name,
count: isNaN(parseInt(count)) ? 1 : parseInt(count),
});
}),
concurrency: new Concurrency({
n: command.concurrency,
}),
});
this.logger.log(`Spec server: ${specServer}`);
this.logger.log(
`Produces products: ${JSON.stringify(production.getRawProps(), null, 2)}`,
);
const result = PipelineEntity.create({ production: production });
return result.unwrap(
async (pipeline) => {
this.isRunning = true;
this.pipelineEntity = pipeline;
await this.pipelineEntity.run();
return Result.ok(true);
},
async (error) => {
return Result.err(error);
},
);
}

isAvailable(): boolean {
return this.isRunning;
}

getConcurrency(): number {
return this.pipelineEntity.getConcurrency();
}

setConcurrency(value: number) {
this.logger.log(`Sets concurrency to: ${value}`);
const newConcurrency = new Concurrency({
n: value,
});
this.pipelineEntity.setConcurrency(newConcurrency);
}

addSpecs(specs: string[]) {
this.logger.log(`Adds specs: ${specs}`);
const additionalSpecs = specs.map((spec) => {
const [name, count] = spec.split(':');
return new Spec({
name,
count: isNaN(parseInt(count)) ? 1 : parseInt(count),
});
});
this.pipelineEntity.addSpecs(additionalSpecs);
}

getSummary(): Summary {
return this.pipelineEntity.getSummary();
}

getSpecs(): Spec[] {
return this.pipelineEntity.getSpecs();
}
}

又是一长串代码,我们只看要点。下面的装饰器声明了这个类会负责处理 ProduceProductCommand 命令对象:

1
@CommandHandler(ProduceProductCommand)

第二个重点是 handle 方法。如果你仔细看会发现这个方法已经碰到一点点业务的边了,但这还不是真正的业务逻辑,只是业务逻辑的组装。这个位置对应到领域驱动设计中是应用层(Appplicaiton Layer)。在这个方法中,我们先实例化了一个 Production 值对象(Value Object),值对象不包含业务逻辑,它用来表示领域概念。比如在这里表示生产这个概念。需要注意的是,在这个建模过程中,我们始终紧密围绕生产这个核心上下文进行建模,以更好地表达领域语言。再往后看,出现了一个 PipelineEntityProduction 值对象作为 PipelineEntitycreate 工厂方法的参数。PipelineEntity 是整个生产过程的核心,几乎所有业务逻辑都封装在这里面。pipelineEntity.run() 开始执行生产任务。

其他的一些方法,比如 isAvailablegetConcurrencysetConcurrency 等方法要么是维护一些简单的状态,要么只是 pipelineEntity 的代理,用来向外部提供一些方法以在可控的范围内获取/修改 pipelineEntity 的内部信息。

关于 pipeline.entity.ts 里的 PipelineEntity 只需要知道两点:

  • PipelineEntity 是具体处理业务逻辑的地方。
  • PipelineEntity 会以一个给定的并发数从任务队列中取出产品规格进行生产,直到生产完全部产品。
  • 我们可以通过 PipelineEntity 的 setConcurrency 方法设置并发数,通过 addSpecs 方法添加需要生产的产品规格和相应数量。

监控模块

monitor.module.ts 的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// src/modules/monitor/monitor.module.ts
import { Module } from '@nestjs/common';
import { CqrsModule } from '@nestjs/cqrs';
import { MonitorHttpController } from './queries/monitor.http.controller';
import { MonitorQueryHandler } from './queries/monitor.query-handler';
import { UpdatePipelineHttpController } from '@modules/monitor/commands/update-pipeline/update-pipeline.http.controller';
import { ProductionModule } from '@modules/production/production.module';

const httpControllers = [MonitorHttpController, UpdatePipelineHttpController];

@Module({
imports: [CqrsModule, ProductionModule],
controllers: [...httpControllers],
providers: [MonitorQueryHandler],
})
export class MonitorModule {}

我们将 ProductionModule 导入 MonitorModule,因为后者依赖前者。controllers 里是 MonitorModule 开放给外部的 HTTP API。httpControllersMonitorHttpController 里放的是查询 controller,UpdatePipelineHttpController 放的是命令 controller。这里专门做了区分。providers 字段里是一个 MonitorQueryHandler,CQRS 将命令和查询分离,因此命令处理器和查询处理器也要分离。

先看 monitor.http.controller.ts:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// src/modules/monitor/queries/monitor.http.controller.ts
import { Controller, HttpStatus, Get } from '@nestjs/common';
import { ApiTags, ApiOperation, ApiResponse } from '@nestjs/swagger';
import { MonitorQueryHandler } from './monitor.query-handler';
import { StatusHttpResponse } from '@modules/monitor/dtos/status.response.dto';

@ApiTags('Monitor')
@Controller()
export class MonitorHttpController {
constructor(private readonly monitorQueryHandler: MonitorQueryHandler) {}

@Get('/pipeline/status')
@ApiOperation({ summary: 'Get status of pipeline' })
@ApiResponse({
status: HttpStatus.OK,
type: StatusHttpResponse,
})
getStatus(): StatusHttpResponse {
return new StatusHttpResponse(this.monitorQueryHandler.getPipelineStatus());
}
}

MonitorHttpController 很简单,暴露一个 endpoint GET /pipeline/status,内部直接调用 MonitorQueryHandlergetPipelineStatus 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// src/modules/monitor/queries/monitor.query-handler.ts
import { Injectable } from '@nestjs/common';
import { ProduceProductService } from '@modules/production/commands/produce-product/produce-product.service';
import { ProductionStatus } from '@src/interface-adapters/interfaces/production/production-status.interface';

@Injectable()
export class MonitorQueryHandler {
constructor(private readonly produceProductService: ProduceProductService) {}

getPipelineStatus(): ProductionStatus {
if (!this.produceProductService.isAvailable()) {
return {} as ProductionStatus;
}
return {
summary: this.produceProductService.getSummary().getRawProps(),
concurrency: this.produceProductService.getConcurrency(),
specs: this.produceProductService
.getSpecs()
.map((spec) => spec.getRawProps()),
};
}
}

MonitorQueryHandler 里通过依赖注入的 ProduceProductService 获取 pipeline 状态。

接着再看 update-pipeline.http.controller.ts:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// src/modules/monitor/commands/update-pipeline/update-pipeline.http.controller.ts
import { ApiTags } from '@nestjs/swagger';
import { Controller, Patch, Body } from '@nestjs/common';
import { CommandBus } from '@nestjs/cqrs';
import { UpdatePipelineHttpRequest } from './update-pipeline.request.dto';
import { UpdatePipelineCommand } from '@modules/monitor/commands/update-pipeline/update-pipeline.command';

@ApiTags('Monitor')
@Controller()
export class UpdatePipelineHttpController {
constructor(private readonly commandBus: CommandBus) {}

@Patch('/pipeline')
async update(@Body() body: UpdatePipelineHttpRequest) {
const command = new UpdatePipelineCommand(body);
await this.commandBus.execute(command);
return {};
}
}

UpdatePipelineHttpController 通过暴露一个 endpoint PATCH /pipeline 来动态更新命令行程序,它创建一个 UpdatePipelineCommand 实例,然后通过 this.commandBus 执行。

同样地,UpdatePipelineCommand 也是一个命令的信息载体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// src/modules/monitor/commands/update-pipeline/update-pipeline.command.ts
import {
Command,
CommandProps,
} from '@libs/ddd/domain/base-classes/command.base';

export class UpdatePipelineCommand extends Command {
constructor(props: CommandProps<UpdatePipelineCommand>) {
super(props);
this.concurrency = props.concurrency;
this.specs = props.specs;
}

readonly concurrency: number;

readonly specs: string[];
}

接着会触发 UpdatePipelineServicehandle 方法执行,这里也通过依赖注入的方式注入了一个 ProduceProductService实例,最后直接执行上面的 setConcurrencyaddSpecs 方法来动态修改生产上下文中的参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// src/modules/monitor/commands/update-pipeline/update-pipeline.service.ts
import { Injectable, Scope, ConsoleLogger } from '@nestjs/common';
import { CommandBus, CommandHandler } from '@nestjs/cqrs';
import { Result } from '@libs/ddd/domain/utils/result.util';
import { CommandHandlerBase } from '@libs/ddd/domain/base-classes/command-handler.base';
import { ProduceProductService } from '@modules/production/commands/produce-product/produce-product.service';
import { UpdatePipelineCommand } from '@modules/monitor/commands/update-pipeline/update-pipeline.command';

@Injectable({
scope: Scope.DEFAULT,
})
@CommandHandler(UpdatePipelineCommand)
export class UpdatePipelineService extends CommandHandlerBase {
private logger = new ConsoleLogger(UpdatePipelineService.name);

constructor(
private readonly commandBus: CommandBus,
private readonly produceProductService: ProduceProductService,
) {
super();
}

async handle(
command: UpdatePipelineCommand,
): Promise<Result<boolean, Error>> {
if (!this.produceProductService.isAvailable()) {
return Result.ok(true);
}
this.produceProductService.setConcurrency(command.concurrency);
if (command.specs) {
this.produceProductService.addSpecs(command.specs);
}
return Result.ok(true);
}
}

让我们运行一下程序看看:

1
2
npm i -g @nullcc/any-factory
any-factory produce-products --specs="a:10;b:20;c:30" --concurrency=1 --with-server

命令行中的日志:

命令行中的日志

HTTP Server swagger API:

HTTP Server swagger API

请求 pipeline 状态:

请求 pipeline 状态

修改 pipeline 参数:

修改 pipeline 参数

请求更新后的 pipeline 状态:

请求更新后的 pipeline 状态

现在可以更新一下可观察的动态命令行程序的基本结构图以给出一种通用结构:

可观察的动态命令行程序

总结

在上面的实现中,我们在本地运行命令行程序时同时启动了一个 HTTP Server,对用户开放了一些 API,同时传统的日志记录形式还是可以继续使用(在文中和示例项目中没有展示出来)。在实现过程中,我们还通过模块化将不同上下文隔离开,它们之间可能会产生耦合,但都在我们的控制之中。例如虽然 MonitorModule 依赖于 ProductionModule,但后者的逻辑不会泄露到前者中。

上面除了讨论了如何构建可观察可修改的命令行程序以外,还应用了领域驱动设计的方法指导实现。当然,由于领域驱动设计是个相当大的主题,本文中无法详细描述。

参考资料

CQRS(命令查询职责分离)
Domain Driven Design(领域驱动设计)
DDD六边形架构(翻译)