Skip to content

Sidecar hasta 2.0.1

Sidecar es un patrón que es muy útil en el mundo de los microservicios, ya que nos permite abstraer funcionalidad genérica de todos los servicios y además, no tener que hacer librerias en cada uno de estos lenguajes para cada funcionalidad (Más info link).

Sidecar

En nuestra implementación para kafka, lo hemos usado para abstraer a los servicios de:

  • El driver a usar para conectar a kafka. Había problemas de kafka con node y el schema registry.
  • Desplegar funcionalidad a todos los servicios, sin tener que hacer grandes cambios en los servicios. (Seguridad, multitenant, etc)
  • El funcionamiento al detalle del consumidor/publicador de kafka.
  • La gestión de Avro.

Flujos

Actualmente el Sidecar dispone de 2 flujos de funcionamiento. Y todas las comunicaciones entre el servicio y el sidecar es usando grpc.

  • Flujo para consumir datos:
    1. Se inicia con el nombre de los topics con el configure, entonces recibiremos el primer dato.
    2. Si procesamos bien el dato, mandamos la opción commit y recibiremos el siguiente mensaje.
    3. En caso de fallo al procesar se envía la opción retry y recibimos el último dato que no hemos hecho commit.

Sidecar Listen

  • Flujo para producir datos:
    1. Enviamos el schema con la llamada schema
    2. Si es necesario hacemos el configure para otros parámetros
    3. Enviamos write con el topic, la clave y los datos del mensaje.
    4. Se recibirá un mensaje con información sobre si ha ido bien o el error. Sidecar Write

Configuración con variables de entorno

Se puede arrancar el Sidecar ya con la configuración predefinida a través de variables de entorno. Si estas variables estan configuradas, sobrescriben las variables que se pasan a través del op=CONFIGURE.

  • KAFKA_BOOTSTRAP_SERVERS sobrescribe el parámetro de configuración bootstrap.servers. Ej. kafka-broker1.com,kafka-broker2.com,kafka-broker3.com
  • KAFKA_LISTENERS variable donde se excplicita cuáles son los puertos por cada uno de los protocoles. Ej. PLAINTEXT://:9092,SASL_PLAINTEXT://:9093
  • KAFKA_SCHEMA_REGISTRY_URL sobrescribe el parámetro de configuración schema.registry.url. Ej. http://kafka.schema.registry/api/schema-registry/
  • KAFKA_GROUP_ID sobrescribe el parámetro de configuración group.id. Ej. test-1
  • KAFKA_SECURITY_PROTOCOL sobrescribe el parámetro de configuración security.protocol. Ej. PLAINTEXT o SASL_PLAINTEXT
  • KAFKA_USERNAME nombre de usuario para la autenticación SASL-SCRAM, si security.protocol es SASL_PLAINTEXT
  • KAFKA_PASSWORD contraseña para la autenticación SASL-SCRAM, si security.protocol es SASL_PLAINTEXT

Cómo ejecutar el sidecar en mi servicio

Nuestra forma de usar el Sidecar es lanzar un proceso dentro del contenedor del servicio, a parte del proceso del servicio. Para esto hay que cambiar la ejecución de los servicios, ya que si ejecutamos 2 procesos al lanzar Docker no gestiona ambos servicios.

Para tener esta gestión utilizamos la herramienta supervisord de Linux, que ya se encarga de gestionar ambos procesos y si hay algún problema con alguno lo levanta. Por tanto, lo que tenemos que hacer es configurar el supervisord para que ejecute nuestro servicio. Un ejemplo de un fichero supervisord.conf que ejecuta un servicio node:

conf
[supervisord]
nodaemon=true
 
[program:mainservice]
directory=/usr/src/app
command=node ./src/index.js
autostart=true
autorestart=true
stdout_events_enabled = true
stderr_events_enabled = true
environment=NODE_ENV="production"
[supervisord]
nodaemon=true
 
[program:mainservice]
directory=/usr/src/app
command=node ./src/index.js
autostart=true
autorestart=true
stdout_events_enabled = true
stderr_events_enabled = true
environment=NODE_ENV="production"

Finalmente tenemos que cambiar el DockerFile:

  • Primero hay que elegir la imagen base que contenga el sidecar para el lenguaje que usemos.
dockerfile
FROM dockerprivatehub-on.azurecr.io/argo_sidecar:1.0.1
FROM dockerprivatehub-on.azurecr.io/argo_sidecar:1.0.1
  • Añadimos la configuración del supervisord al directorio del docker, para tener la configuración del servicio y la del sidecar.
dockerfile
ENV PYTHONUNBUFFERED 1
COPY supervisord.conf /etc/supervisor/conf.d/example-sidecar.conf
ENV PYTHONUNBUFFERED 1
COPY supervisord.conf /etc/supervisor/conf.d/example-sidecar.conf
  • Finalmente ejecutamos el servicio con el supervisord^
dockerfile
CMD ["/usr/bin/supervisord", "-c", "/etc/supervisor/supervisord.conf"]
CMD ["/usr/bin/supervisord", "-c", "/etc/supervisor/supervisord.conf"]

Cómo publicar mensajes

Para poder producir mensajes

  1. se tiene que llamar como primero el método write

  2. se tiene que enviar un WriteParams con un identificador y uno schema Avro (op=SCHEMA)

  3. se puede enviar más que uno schema si se tiene que escribir diferentes tipos de mensajes

  4. se puede enviar un WriteParams con una configuración, que se va a añadir a la configuración con variables de entorno (op=CONFIGURE)

  5. se tiene que enviar un o más WriteParams con type, key (opcional), value y topic (op=WRITE)

Ejemplo:

Lee uno schema Avro

javascript
const path = require("path");
const fs = require("fs");
var AVRO_SCHEMA_PATH = path.join(__dirname,"..","avro","nombreSchema.avsc");
var avroSchema = fs.readFileSync(AVRO_SCHEMA_PATH).toString();
const path = require("path");
const fs = require("fs");
var AVRO_SCHEMA_PATH = path.join(__dirname,"..","avro","nombreSchema.avsc");
var avroSchema = fs.readFileSync(AVRO_SCHEMA_PATH).toString();

A continuación tenemos que utilizar el client precompilado Node, para hacer esto se tiene que añadir al fichero .npmrc de la carpeta del proyecto, el repositorio:

@datahub:registry= "http://nexus2.absapp.net/repository/npm-snapshots/"
@datahub:registry= "http://nexus2.absapp.net/repository/npm-snapshots/"

Luego se instala el client

npm install --save @datahub/datahub-client-node@2.0.1
npm install --save @datahub/datahub-client-node@2.0.1

Entonces se tiene que importar el modulo, utilizando la generación de código dinámica. También hay código estático generado e incluido en el paquete, pero si se utiliza el código estático no se puede pasar objectos Json a grpc y se tiene que utilizar las clases de modelo y los métodos setter para crear los objectos.

javascript
const grpc = require('grpc');
const datahub = require("@datahub/datahub-client-node");
const protoDescriptor = datahub.dynamicCodegen();

var DataHubService = protoDescriptor.aquacis.swap.Proto.DataHubService;

DataHubService = new DataHubService('localhost:'+SIDECAR_PORT, grpc.credentials.createInsecure());
const grpc = require('grpc');
const datahub = require("@datahub/datahub-client-node");
const protoDescriptor = datahub.dynamicCodegen();

var DataHubService = protoDescriptor.aquacis.swap.Proto.DataHubService;

DataHubService = new DataHubService('localhost:'+SIDECAR_PORT, grpc.credentials.createInsecure());

Una vez con el service definido, creamos el stream para publicar datos

javascript
var writeStream = DataHubService.write();
var writeStream = DataHubService.write();

Si se necesita, se puede llamar un op=CONFIGURE

javascript
var configureParams = {
        op: WriteOp.CONFIGURE,
        configuration: {
            properties: {
                "multiple.schemas": "true" 
                // si se tiene que escribir mensajes 
                // con varios schemas en el mismo topic
            }
        }
}
var configureParams = {
        op: WriteOp.CONFIGURE,
        configuration: {
            properties: {
                "multiple.schemas": "true" 
                // si se tiene que escribir mensajes 
                // con varios schemas en el mismo topic
            }
        }
}

Entonces se escribe el schema y a continuación los mensajes:

javascript
var account = {
        id: "uuid",
        nombre: "pepe"
};


var schemaParams = {
        op: WriteOp.SCHEMA,
        type: "nombreSchema",
        schema:avroSchema
};

var writeParams = {
        op: WriteOp.WRITE,
        key: ""+account.id,
        type: "nombreSchema",
        topic: "account",
        value: JSON.stringify(account)
};

writeStream.write(schemaParams);

writeStream.write(writeParams);

writeStream.on('data', function(response) {
    console.log(response);
});
writeStream.on('end', function(e) {
    console.log("end");
    onEnd();
});
writeStream.on('error', function(e) {
    console.error(e);
    onEnd();
});
var account = {
        id: "uuid",
        nombre: "pepe"
};


var schemaParams = {
        op: WriteOp.SCHEMA,
        type: "nombreSchema",
        schema:avroSchema
};

var writeParams = {
        op: WriteOp.WRITE,
        key: ""+account.id,
        type: "nombreSchema",
        topic: "account",
        value: JSON.stringify(account)
};

writeStream.write(schemaParams);

writeStream.write(writeParams);

writeStream.on('data', function(response) {
    console.log(response);
});
writeStream.on('end', function(e) {
    console.log("end");
    onEnd();
});
writeStream.on('error', function(e) {
    console.error(e);
    onEnd();
});

Propiedades de configuración del productor

  • bootstrap.servers: Esta propiedad es opcional y solo hay que usarla si no está configurada por defecto en el sidecar. La ip del kafka al que conectarse, puede ser una ip o todas las del clúster.
  • schema.registry.url: Esta propiedad es opcional y solo hay que usarla si no está configurada por defecto en el sidecar. La url del schema registry dónde se guardarán los schemas, puede ser una url o varias.
  • multiple.schemas: Esta propiedad es opcional y solo hay que usarla si queremos varios schemas en un mismo topics. En dicho caso, pondremos de valor: true

También se pueden añadir otras propiedades del productor Kafka

Cómo consumir mensajes

Primero tenemos que utilizar el client precompilado Node, para hacer esto se tiene que añadir al fichero .npmrc de la carpeta del proyecto, el repositorio:

@datahub:registry= "http://nexus2.absapp.net/repository/npm-snapshots/"
@datahub:registry= "http://nexus2.absapp.net/repository/npm-snapshots/"

Luego se instala el client

npm install --save @datahub/datahub-client-node@2.0.1
npm install --save @datahub/datahub-client-node@2.0.1

Entonces se tiene que importar el modulo, utilizando la generación de código dinámica. También hay código estático generado e incluido en el paquete, pero si se utiliza el código estático no se puede pasar objectos Json a grpc y se tiene que utilizar las clases de modelo y los métodos setter para crear los objectos.

javascript
const grpc = require('grpc');
const datahub = require("@datahub/datahub-client-node");
const protoDescriptor = datahub.dynamicCodegen();

var DataHubService = protoDescriptor.aquacis.swap.Proto.DataHubService;

DataHubService = new DataHubService('localhost:'+SIDECAR_PORT, grpc.credentials.createInsecure());
const grpc = require('grpc');
const datahub = require("@datahub/datahub-client-node");
const protoDescriptor = datahub.dynamicCodegen();

var DataHubService = protoDescriptor.aquacis.swap.Proto.DataHubService;

DataHubService = new DataHubService('localhost:'+SIDECAR_PORT, grpc.credentials.createInsecure());

Una vez con el service definido, creamos el stream para recibir datos

javascript
var listenStream = DataHubService.listen();
var listenStream = DataHubService.listen();

Después enviamos un mensaje para configurar el stream

javascript
var configureParams = {
    op: ListenOp.CONFIGURE,
    configuration: {
        topics: ["topic1","topic2"]
    }
};

listenStream.write(configureParams);
var configureParams = {
    op: ListenOp.CONFIGURE,
    configuration: {
        topics: ["topic1","topic2"]
    }
};

listenStream.write(configureParams);

Entonces recibimos los updates y hacemos COMMIT o RETRY, dependiendo que el procesamiento haya ido bien o haya fallado.

javascript
listenStream.on('data', function(update) {
    console.log(update);
    //Ejecuto procesamiento
    //Si el procesamiento ha ido bien
    listen.write({op: ListenOp.COMMIT});
    //Si el procesamiento ha fallado en algún momento
    listen.write({op: ListenOp.RETRY}); 
});

listenStream.on('end', function() {
    console.log("stream ended", arguments);
    onError(stream);
});
listenStream.on('error', function(e) {
    console.error(e);
    onError(stream);
});
listenStream.on('data', function(update) {
    console.log(update);
    //Ejecuto procesamiento
    //Si el procesamiento ha ido bien
    listen.write({op: ListenOp.COMMIT});
    //Si el procesamiento ha fallado en algún momento
    listen.write({op: ListenOp.RETRY}); 
});

listenStream.on('end', function() {
    console.log("stream ended", arguments);
    onError(stream);
});
listenStream.on('error', function(e) {
    console.error(e);
    onError(stream);
});

Propiedades de configuración del consumidor

  • topics un array de topics que se tienen que escuchar

  • properties propiedades del consumidor Kafka, en particular

    • bootstrap.servers: Esta propiedad es opcional y solo hay que usarla si no está configurada por defecto en el sidecar. La ip del kafka al que conectarse, puede ser una ip o todas las del clúster.
    • schema.registry.url: Esta propiedad es opcional y solo hay que usarla si no está configurada por defecto en el sidecar. La url del schema registry dónde se guardarán los schemas, puede ser una url o varias.
    • group.id: Esta propiedad es opcional y solo hay que usarla si no está configurada por defecto en el sidecar. Nombre del ConsumerGroup que se usa para consumir. Altamente recomendable tener distintos nombres por servicio, para que cada servicio lleve su propio orden de consumición, sino cuando un servicio lo procese el otro servicio no lo hará. En cambio, un servicio con distintas instancias utilizará el mismo nombre para repartir las particiones.

    También se pueden añadir otras propiedades del consumidor Kafka

Otras consideraciones

Es importante tener en cuenta la gestión de la conexión con el sidecar, para ellos hay que hacer reintentos de conexión. En nuetro ejemplo, de eso se encarga la función onError:

javascript
var onError = (clientSocket)=>{
    if(clientSocket) clientSocket.end();
    setTimeout(()=>{
        //Volver a conectarse al sidecar
    }, 5000);
}
var onError = (clientSocket)=>{
    if(clientSocket) clientSocket.end();
    setTimeout(()=>{
        //Volver a conectarse al sidecar
    }, 5000);
}

También es importante recalcar que para node se ha creado una librería que realiza todas estas tareas de gestión. Se puede encontrar en el PlatformNodeTools.