Appearance
Sidecar hasta 1.0.8
Sidecar es un patrón que es muy útil en el mundo de los microservicios, ya que nos permite abstraer funcionalidad genérica de todos los servicios y además, no tener que hacer librerias en cada uno de estos lenguajes para cada funcionalidad (Más info link).
En nuestra implementación para kafka, lo hemos usado para abstraer a los servicios de:
- El driver a usar para conectar a kafka. Había problemas de kafka con node y el schema registry.
- Desplegar funcionalidad a todos los servicios, sin tener que hacer grandes cambios en los servicios. (Seguridad, multitenant, etc)
- El funcionamiento al detalle del consumidor/publicador de kafka.
- La gestión de Avro.
Flujos
Actualmente el Sidecar dispone de 3 flujos de funcionamiento. Y todas las comunicaciones entre el servicio y el sidecar es usando grpc.
El primero y que siempre tenemos que hacer es el Configure. Mandamos la configuración tanto para consumir como para publicar, en este último caso, deberemos mandar un mapa con el nombre del schema y el schema de Avro a utilizar:
Después se dipone de otros dos flujos que necesitan del Configure primero: *Flujo para consumir datos:
- Se inicia con el nombre del topic y la opción Subscribe, entonces recibiremos el primer dato.
- Si procesamos bien el dato, mandamos la opción Commit y recibiremos el siguiente mensaje.
- En caso de fallo al procesar se envía la opción Retry y recibimos el último dato que no hemos hecho commit.
- Flujo para producir datos:
- Enviamos el nombre del schema (El que pusimos en el Configure), el topic, la clave y los datos del mensaje.
- Se recibirá un mensaje con información sobre si ha ido bien o el error.
Configuración con variables de entorno
Se puede arrancar el Sidecar ya con la configuración predefinida a través de variables de entorno. Si estas variables estan configuradas, sobrescriben las variables que se pasan a través del método Configure.
- KAFKA_BOOTSTRAP_SERVERS sobrescribe el parámetro de configuración bootstrap.servers. Ej. kafka-broker1.com,kafka-broker2.com,kafka-broker3.com
- KAFKA_LISTENERS variable donde se excplicita cuáles son los puertos por cada uno de los protocoles. Ej. PLAINTEXT://:9092,SASL_PLAINTEXT://:9093
- KAFKA_SCHEMA_REGISTRY_URL sobrescribe el parámetro de configuración schema.registry.url. Ej. http://kafka.schema.registry/api/schema-registry/
- KAFKA_GROUP_ID sobrescribe el parámetro de configuración group.id. Ej. test-1
- KAFKA_SECURITY_PROTOCOL sobrescribe el parámetro de configuración security.protocol. Ej. PLAINTEXT o SASL_PLAINTEXT
- KAFKA_USERNAME nombre de usuario para la autenticación SASL-SCRAM, si security.protocol es SASL_PLAINTEXT
- KAFKA_PASSWORD contraseña para la autenticación SASL-SCRAM, si security.protocol es SASL_PLAINTEXT
Cómo ejecutar el sidecar en mi servicio
Nuestra forma de usar el Sidecar es lanzar un proceso dentro del contenedor del servicio, a parte del proceso del servicio. Para esto hay que cambiar la ejecución de los servicios, ya que si ejecutamos 2 procesos al lanzar Docker no gestiona ambos servicios.
Para tener esta gestión utilizamos la herramienta supervisord de Linux, que ya se encarga de gestionar ambos procesos y si hay algún problema con alguno lo levanta. Por tanto, lo que tenemos que hacer es configurar el supervisord para que ejecute nuestro servicio. Un ejemplo de un fichero supervisord.conf que ejecuta un servicio node:
conf
[supervisord]
nodaemon=true
[program:mainservice]
directory=/usr/src/app
command=node ./src/index.js
autostart=true
autorestart=true
stdout_events_enabled = true
stderr_events_enabled = true
environment=NODE_ENV="production"
[supervisord]
nodaemon=true
[program:mainservice]
directory=/usr/src/app
command=node ./src/index.js
autostart=true
autorestart=true
stdout_events_enabled = true
stderr_events_enabled = true
environment=NODE_ENV="production"
Finalmente tenemos que cambiar el DockerFile:
- Primero hay que elegir la imagen base que contenga el sidecar para el lenguaje que usemos.
dockerfile
FROM dockerprivatehub-on.azurecr.io/argo_sidecar:1.0.1
FROM dockerprivatehub-on.azurecr.io/argo_sidecar:1.0.1
- Añadimos la configuración del supervisord al directorio del docker, para tener la configuración del servicio y la del sidecar.
dockerfile
ENV PYTHONUNBUFFERED 1
COPY supervisord.conf /etc/supervisor/conf.d/example-sidecar.conf
ENV PYTHONUNBUFFERED 1
COPY supervisord.conf /etc/supervisor/conf.d/example-sidecar.conf
- Finalmente ejecutamos el servicio con el supervisord^
dockerfile
CMD ["/usr/bin/supervisord", "-c", "/etc/supervisor/supervisord.conf"]
CMD ["/usr/bin/supervisord", "-c", "/etc/supervisor/supervisord.conf"]
Cómo publicar mensajes
Para poder producir mensajes primero tenemos que tener en cuenta la configuración.
En la configuración tenemos que tener en cuenta varias cosas:
- schemaConfiguration: Crear un mapa con los schemas que vamos a usar, indicando un nombre del schema que posteriormente utilizaremos al publicar un mensaje.
- topicConfiguration: Indicar la configuración default para afectar todos los topics. Y en el producerConfiguration los siguientes campos:
- bootstrap.servers: Esta propiedad es opcional y sólo hay que usarla si no está configurada por defecto en el sidecar. La ip del kafka al que conectarse, puede ser una ip o todas las del clúster.
- schema.registry.url: Esta propiedad es opcional y sólo hay que usarla si no está configurada por defecto en el sidecar. La url del schema registry dónde se guardarán los schemas, puede ser una url o varias.
- multiple.schemas: Esta propiedad es opcional y sólo hay que usarla si queremos varios schemas en un mismo topics. En dicho caso, pondremos de valor: true
Ejemplo:
javascript
const path = require("path");
const fs = require("fs");
var AVRO_SCHEMA_PATH = path.join(__dirname,"..","avro","nombreSchema.avsc");
var avroSchema = fs.readFileSync(AVRO_SCHEMA_PATH).toString();
var configureParams = {
schemaConfiguration: {"nombreSchema": {definition: avroSchema}},
topicConfiguration: {
"_default_": {
producerConfiguration: {
"multiple.schemas": "true"
}
}
}
}
const path = require("path");
const fs = require("fs");
var AVRO_SCHEMA_PATH = path.join(__dirname,"..","avro","nombreSchema.avsc");
var avroSchema = fs.readFileSync(AVRO_SCHEMA_PATH).toString();
var configureParams = {
schemaConfiguration: {"nombreSchema": {definition: avroSchema}},
topicConfiguration: {
"_default_": {
producerConfiguration: {
"multiple.schemas": "true"
}
}
}
}
A continuación tenemos que utilizar el client precompilado Node, para hacer esto se tiene que añadir al fichero .npmrc de la carpeta del proyecto, el repositorio:
@datahub:registry= "http://nexus2.absapp.net/repository/npm-snapshots/"
@datahub:registry= "http://nexus2.absapp.net/repository/npm-snapshots/"
Luego se instala el client
npm install --save @datahub/datahub-client-node@1.0.8
npm install --save @datahub/datahub-client-node@1.0.8
Entonces se tiene que importar el modulo, utilizando la generación de código dinámica. También hay código estático generado e incluido en el paquete, pero si se utiliza el código estático no se puede pasar objectos Json a grpc y se tiene que utilizar las clases de modelo y los métodos setter para crear los objectos.
javascript
const grpc = require('grpc');
const datahub = require("@datahub/datahub-client-node");
const protoDescriptor = datahub.dynamicCodegen();
var DataHubService = protoDescriptor.aquacis.swap.Proto.DataHubService;
DataHubService = new DataHubService('localhost:'+SIDECAR_PORT, grpc.credentials.createInsecure());
const grpc = require('grpc');
const datahub = require("@datahub/datahub-client-node");
const protoDescriptor = datahub.dynamicCodegen();
var DataHubService = protoDescriptor.aquacis.swap.Proto.DataHubService;
DataHubService = new DataHubService('localhost:'+SIDECAR_PORT, grpc.credentials.createInsecure());
Una vez con el service definido, configuramos el sidecar y a continuación creamos el stream para publicar datos:
javascript
DataHubService.configure(configureParams, ()=>{
var writeStream = DataHubService.write();
var account = {
id: "uuid",
nombre: "pepe"
};
var writeParams = {
key: ""+account.id,
type: "nombreSchema",
topic: "account",
value: JSON.stringify(account)
};
writeStream.write(writeParams);
writeStream.on('data', function(response) {
console.log(response);
});
writeStream.on('end', function(e) {
console.log("end");
onEnd();
});
writeStream.on('error', function(e) {
console.error(e);
onEnd();
});
});
DataHubService.configure(configureParams, ()=>{
var writeStream = DataHubService.write();
var account = {
id: "uuid",
nombre: "pepe"
};
var writeParams = {
key: ""+account.id,
type: "nombreSchema",
topic: "account",
value: JSON.stringify(account)
};
writeStream.write(writeParams);
writeStream.on('data', function(response) {
console.log(response);
});
writeStream.on('end', function(e) {
console.log("end");
onEnd();
});
writeStream.on('error', function(e) {
console.error(e);
onEnd();
});
});
Cómo consumir mensajes
Para poder consumir mensajes primero tenemos que tener en cuenta la configuración.
En la configuración tenemos que tener en cuenta varias cosas:
- topicConfiguration: Indicar la configuración default para afectar todos los topics. Y en el consumerConfiguration los siguientes campos:
- bootstrap.servers: Esta propiedad es opcional y sólo hay que usarla si no está configurada por defecto en el sidecar. La ip del kafka al que conectarse, puede ser una ip o todas las del clúster.
- schema.registry.url: Esta propiedad es opcional y sólo hay que usarla si no está configurada por defecto en el sidecar. La url del schema registry dónde se guardarán los schemas, puede ser una url o varias.
- group.id: Esta propiedad es opcional y sólo hay que usarla si no está configurada por defecto en el sidecar. Nombre del ConsumerGroup que se usa para consumir. Altamente recomendable tener distintos nombres por servicio, para que cada servicio lleve su propio orden de consumición, sino cuando un servicio lo procese el otro servicio no lo hará. En cambio, un servicio con distintas instancias utilizará el mismo nombre para repartir las particiones.
Ejemplo:
javascript
const path = require("path");
const fs = require("fs");
var AVRO_SCHEMA_PATH = path.join(__dirname,"..","avro","nombreSchema.avsc");
var avroSchema = fs.readFileSync(AVRO_SCHEMA_PATH).toString();
var configureParams = {
topicConfiguration: {
"_default_": {
consumerConfiguration: {
"group.id": "Nombre del ConsumerGroup"
}
}
}
}
const path = require("path");
const fs = require("fs");
var AVRO_SCHEMA_PATH = path.join(__dirname,"..","avro","nombreSchema.avsc");
var avroSchema = fs.readFileSync(AVRO_SCHEMA_PATH).toString();
var configureParams = {
topicConfiguration: {
"_default_": {
consumerConfiguration: {
"group.id": "Nombre del ConsumerGroup"
}
}
}
}
A continuación tenemos que utilizar el client precompilado Node, para hacer esto se tiene que añadir al fichero .npmrc de la carpeta del proyecto, el repositorio:
@datahub:registry= "http://nexus2.absapp.net/repository/npm-snapshots/"
@datahub:registry= "http://nexus2.absapp.net/repository/npm-snapshots/"
Luego se instala el client
npm install --save @datahub/datahub-client-node@1.0.8
npm install --save @datahub/datahub-client-node@1.0.8
Entonces se tiene que importar el modulo, utilizando la generación de código dinámica. También hay código estático generado e incluido en el paquete, pero si se utiliza el código estático no se puede pasar objectos Json a grpc y se tiene que utilizar las clases de modelo y los métodos setter para crear los objectos.
javascript
const grpc = require('grpc');
const datahub = require("@datahub/datahub-client-node");
const protoDescriptor = datahub.dynamicCodegen();
var DataHubService = protoDescriptor.aquacis.swap.Proto.DataHubService;
DataHubService = new DataHubService('localhost:'+SIDECAR_PORT, grpc.credentials.createInsecure());
const grpc = require('grpc');
const datahub = require("@datahub/datahub-client-node");
const protoDescriptor = datahub.dynamicCodegen();
var DataHubService = protoDescriptor.aquacis.swap.Proto.DataHubService;
DataHubService = new DataHubService('localhost:'+SIDECAR_PORT, grpc.credentials.createInsecure());
Una vez con el service definido, configuramos el sidecar y a continuación creamos el stream para publicar datos:
javascript
var ListenOp = protoDescriptor.aquacis.swap.Proto.ListenParams.ListenOp;
DataHubService.configure(configureParams, ()=>{
if(error){
console.log("cannot connect, retrying in 5 seconds");
onError();
return;
}
var listen = DataHubService.listen();
listen.write({op: ListenOp.SUBSCRIBE, topic: "account"});
listen.on('data', function(update) {
console.log(update);
//Ejecuto procesamiento
//Si el procesamiento ha ido bien
listen.write({op: ListenOp.COMMIT});
//Si el procesamiento ha fallado en algún momento
listen.write({op: ListenOp.RETRY});
});
stream.on('end', function() {
console.log("stream ended", arguments);
onError(stream);
});
stream.on('error', function(e) {
console.error(e);
onError(stream);
});
});
var ListenOp = protoDescriptor.aquacis.swap.Proto.ListenParams.ListenOp;
DataHubService.configure(configureParams, ()=>{
if(error){
console.log("cannot connect, retrying in 5 seconds");
onError();
return;
}
var listen = DataHubService.listen();
listen.write({op: ListenOp.SUBSCRIBE, topic: "account"});
listen.on('data', function(update) {
console.log(update);
//Ejecuto procesamiento
//Si el procesamiento ha ido bien
listen.write({op: ListenOp.COMMIT});
//Si el procesamiento ha fallado en algún momento
listen.write({op: ListenOp.RETRY});
});
stream.on('end', function() {
console.log("stream ended", arguments);
onError(stream);
});
stream.on('error', function(e) {
console.error(e);
onError(stream);
});
});
Otras consideraciones
Es importante tener en cuenta la gestión de la conexión con el sidecar, para ellos hay que hacer reintentos de conexión. En nuetro ejemplo, de eso se encarga la función onError:
javascript
var onError = (clientSocket)=>{
if(clientSocket) clientSocket.end();
setTimeout(()=>{
//Volver a conectarse al sidecar
}, 5000);
}
var onError = (clientSocket)=>{
if(clientSocket) clientSocket.end();
setTimeout(()=>{
//Volver a conectarse al sidecar
}, 5000);
}
También es importante recalcar que para node se ha creado una librería que realiza todas estas tareas de gestión. Se puede encontrar en el PlatformNodeTools.