Appearance
Sidecar última (>=3.1.5)
Sidecar es un patrón que es muy útil en el mundo de los microservicios, ya que nos permite abstraer funcionalidad genérica de todos los servicios y además, no tener que hacer librerias en cada uno de estos lenguajes para cada funcionalidad (Más info link).
En nuestra implementación para kafka, lo hemos usado para abstraer a los servicios de:
- El driver a usar para conectar a kafka. Había problemas de kafka con node y el schema registry.
- Desplegar funcionalidad a todos los servicios, sin tener que hacer grandes cambios en los servicios. (Seguridad, multitenant, etc)
- El funcionamiento al detalle del consumidor/publicador de kafka.
- La gestión de Avro.
Flujos
Actualmente el Sidecar dispone de 2 flujos de funcionamiento. Y todas las comunicaciones entre el servicio y el sidecar es usando grpc.
- Flujo para consumir datos:
- Se inicia con el nombre de los topics con el configure, entonces recibiremos el primer dato.
- Si procesamos bien el dato, mandamos la opción commit y recibiremos el siguiente mensaje.
- En caso de fallo al procesar se envía la opción retry y recibimos el último dato que no hemos hecho commit.
- Flujo para producir datos:
- Enviamos el schema con la llamada schema
- Si es necesario hacemos el configure para otros parámetros
- Enviamos write con el topic, la clave y los datos del mensaje.
- Se recibirá un mensaje con información sobre si ha ido bien o el error.
Configuración con variables de entorno
Se puede arrancar el Sidecar ya con la configuración predefinida a través de variables de entorno. Si estas variables estan configuradas, sobrescriben las variables que se pasan a través del op=CONFIGURE.
- KAFKA_BOOTSTRAP_SERVERS sobrescribe el parámetro de configuración bootstrap.servers. Ej. 10.18.179.33,10.18.179.34,10.18.179.35
- KAFKA_LISTENERS variable donde se excplicita cuáles son los puertos por cada uno de los protocoles. Ej. PLAINTEXT://:9092,SASL_PLAINTEXT://:9093
- KAFKA_SCHEMA_REGISTRY_URL sobrescribe el parámetro de configuración schema.registry.url. Ej. http://10.18.179.35/api/schema-registry/
- KAFKA_GROUP_ID sobrescribe el parámetro de configuración group.id. Ej. test-1
- KAFKA_SECURITY_PROTOCOL sobrescribe el parámetro de configuración security.protocol. Ej. PLAINTEXT o SASL_PLAINTEXT
- KAFKA_USERNAME nombre de usuario para la autenticación SASL-SCRAM, si security.protocol es SASL_PLAINTEXT
- KAFKA_PASSWORD contraseña para la autenticación SASL-SCRAM, si security.protocol es SASL_PLAINTEXT
- SIDECAR_MAX_MESSAGE_SIZE tamaño en bytes del mensaje mas grande que se puede enviar y recibir de Kafka (a partir de la 2.4.21)
- KAFKA_SSL_KEYSTORE corresponde a ssl.keystore.location (a partir de la 2.6.0)
- KAFKA_SSL_TRUSTSTORE corresponde a ssl.truststore.location (a partir de la 2.6.0)
- KAFKA_SSL_KEY_PASSWORD corresponde a ssl.key.password (a partir de la 2.6.0)
- KAFKA_SSL_KEYSTORE_PASSWORD corresponde a ssl.keystore.password (a partir de la 2.6.0)
- KAFKA_SSL_TRUSTSTORE_PASSWORD corresponde a ssl.truststore.password (a partir de la 2.6.0)
- KAFKA_SCRAM_SASL_MECHANISM corresponde a sasl.mechanism (a partir de la 2.6.0)
- SIDECAR_DRAIN_TIMEOUT corresponde a sidecar.drain.timeout (a partir de la 3.1.4)
- SIDECAR_COMMIT_TIMEOUT corresponde a sidecar.commit.timeout (a partir de la 3.1.4)
- SIDECAR_COMMIT_NOTIFICATION_MAX_RETRIES corresponde a sidecar.commit.notification.max.retries (a partir de la 3.1.4)
- SIDECAR_ALWAYS_COMMIT_NOTIFICATION corresponde a sidecar.commit.notification.always (a partir de la 3.1.4)
Cómo ejecutar el sidecar en mi servicio
El sidecar está pensado para ser lanzado como container de un pod en Kubernetes, para la ejecución en Docker Swarm con supervisor hay que mirar la versión 2.8.4.
Un ejemplo de contenedor para un deployment Kubernetes:
yaml
- name: sidecar
image: nautilus.azurecr.io/nautilus_sidecar:4.2.2
imagePullPolicy: "Always"
command: ["/usr/sidecar/init.sh"]
ports:
- containerPort: 8080
- containerPort: 8081
env:
- name: SIDECAR_JAVA_OPTS
value: "-Xms128m -Xmx256m"
- name: KAFKA_BOOTSTRAP_SERVERS
valueFrom:
configMapKeyRef:
name: iot-{{env_name}}-cfg
key: kafka.server
- name: KAFKA_LISTENERS
valueFrom:
configMapKeyRef:
name: iot-{{env_name}}-cfg
key: kafka.listener
- name: KAFKA_SCHEMA_REGISTRY_URL
valueFrom:
configMapKeyRef:
name: iot-{{env_name}}-cfg
key: kafka.schema.registry.url
- name: KAFKA_SECURITY_PROTOCOL
valueFrom:
configMapKeyRef:
name: iot-{{env_name}}-cfg
key: kafka.security.protocol
- name: JAEGER_SERVICE_NAME
value: iot_links
- name: JAEGER_AGENT_HOST
valueFrom:
fieldRef:
fieldPath: status.hostIP
- name: KAFKA_GROUP_ID
value: iot.links.1
- name: HOST
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: KAFKA_USERNAME
valueFrom:
secretKeyRef:
name: nautilus-datahub-{{env_name}}-kafka-iot
key: username
- name: KAFKA_PASSWORD
valueFrom:
secretKeyRef:
name: nautilus-datahub-{{env_name}}-kafka-iot
key: password
- name: KAFKA_SSL_KEYSTORE_PASSWORD
valueFrom:
secretKeyRef:
name: nautilus-datahub-{{env_name}}-kafka-client
key: keystore-password
- name: KAFKA_SSL_TRUSTSTORE_PASSWORD
valueFrom:
secretKeyRef:
name: nautilus-datahub-{{env_name}}-kafka-client
key: truststore-password
- name: KAFKA_SSL_KEY_PASSWORD
valueFrom:
secretKeyRef:
name: nautilus-datahub-{{env_name}}-kafka-client
key: key-password
volumeMounts:
- name: kafka-client
mountPath: "/usr/sidecar/keystore/kafka.client.keystore.jks"
subPath: "kafka.client.keystore.jks"
- name: kafka-client
mountPath: "/usr/sidecar/keystore/kafka.client.truststore.jks"
subPath: "kafka.client.truststore.jks"
resources:
limits:
cpu: 500m
memory: 512Mi
requests:
cpu: 10m
memory: 256Mi
readinessProbe:
httpGet:
path: /health
port: 8081
failureThreshold: 1
initialDelaySeconds: 120
periodSeconds: 5
successThreshold: 1
timeoutSeconds: 5
livenessProbe:
httpGet:
path: /health
port: 8081
failureThreshold: 3
initialDelaySeconds: 120
periodSeconds: 30
successThreshold: 1
timeoutSeconds: 5
- name: sidecar
image: nautilus.azurecr.io/nautilus_sidecar:4.2.2
imagePullPolicy: "Always"
command: ["/usr/sidecar/init.sh"]
ports:
- containerPort: 8080
- containerPort: 8081
env:
- name: SIDECAR_JAVA_OPTS
value: "-Xms128m -Xmx256m"
- name: KAFKA_BOOTSTRAP_SERVERS
valueFrom:
configMapKeyRef:
name: iot-{{env_name}}-cfg
key: kafka.server
- name: KAFKA_LISTENERS
valueFrom:
configMapKeyRef:
name: iot-{{env_name}}-cfg
key: kafka.listener
- name: KAFKA_SCHEMA_REGISTRY_URL
valueFrom:
configMapKeyRef:
name: iot-{{env_name}}-cfg
key: kafka.schema.registry.url
- name: KAFKA_SECURITY_PROTOCOL
valueFrom:
configMapKeyRef:
name: iot-{{env_name}}-cfg
key: kafka.security.protocol
- name: JAEGER_SERVICE_NAME
value: iot_links
- name: JAEGER_AGENT_HOST
valueFrom:
fieldRef:
fieldPath: status.hostIP
- name: KAFKA_GROUP_ID
value: iot.links.1
- name: HOST
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: KAFKA_USERNAME
valueFrom:
secretKeyRef:
name: nautilus-datahub-{{env_name}}-kafka-iot
key: username
- name: KAFKA_PASSWORD
valueFrom:
secretKeyRef:
name: nautilus-datahub-{{env_name}}-kafka-iot
key: password
- name: KAFKA_SSL_KEYSTORE_PASSWORD
valueFrom:
secretKeyRef:
name: nautilus-datahub-{{env_name}}-kafka-client
key: keystore-password
- name: KAFKA_SSL_TRUSTSTORE_PASSWORD
valueFrom:
secretKeyRef:
name: nautilus-datahub-{{env_name}}-kafka-client
key: truststore-password
- name: KAFKA_SSL_KEY_PASSWORD
valueFrom:
secretKeyRef:
name: nautilus-datahub-{{env_name}}-kafka-client
key: key-password
volumeMounts:
- name: kafka-client
mountPath: "/usr/sidecar/keystore/kafka.client.keystore.jks"
subPath: "kafka.client.keystore.jks"
- name: kafka-client
mountPath: "/usr/sidecar/keystore/kafka.client.truststore.jks"
subPath: "kafka.client.truststore.jks"
resources:
limits:
cpu: 500m
memory: 512Mi
requests:
cpu: 10m
memory: 256Mi
readinessProbe:
httpGet:
path: /health
port: 8081
failureThreshold: 1
initialDelaySeconds: 120
periodSeconds: 5
successThreshold: 1
timeoutSeconds: 5
livenessProbe:
httpGet:
path: /health
port: 8081
failureThreshold: 3
initialDelaySeconds: 120
periodSeconds: 30
successThreshold: 1
timeoutSeconds: 5
Cómo publicar mensajes
Para poder producir mensajes
se tiene que llamar como primero el método write
se tiene que enviar un WriteParams con un identificador y uno schema Avro (op=SCHEMA)
se puede enviar más que uno schema si se tiene que escribir diferentes tipos de mensajes
se puede enviar un WriteParams con una configuración, que se va a añadir a la configuración con variables de entorno (op=CONFIGURE)
se tiene que enviar un o más WriteParams con type, key (opcional), value y topic (op=WRITE)
Ejemplo:
Lee uno schema Avro
javascript
const path = require("path");
const fs = require("fs");
var AVRO_SCHEMA_PATH = path.join(__dirname,"..","avro","nombreSchema.avsc");
var avroSchema = fs.readFileSync(AVRO_SCHEMA_PATH).toString();
const path = require("path");
const fs = require("fs");
var AVRO_SCHEMA_PATH = path.join(__dirname,"..","avro","nombreSchema.avsc");
var avroSchema = fs.readFileSync(AVRO_SCHEMA_PATH).toString();
A continuación tenemos que utilizar el client precompilado Node, para hacer esto se tiene que añadir al fichero .npmrc de la carpeta del proyecto, el repositorio de Azure DevOps / nautilus.
Luego se instala el client
npm install --save @datahub/datahub-client-node@2.3.19
npm install --save @datahub/datahub-client-node@2.3.19
Entonces se tiene que importar el modulo, utilizando la generación de código dinámica. También hay código estático generado e incluido en el paquete, pero si se utiliza el código estático no se puede pasar objectos Json a grpc y se tiene que utilizar las clases de modelo y los métodos setter para crear los objectos.
javascript
const grpc = require('grpc');
const datahub = require("@datahub/datahub-client-node");
const protoDescriptor = datahub.dynamicCodegen();
var DataHubService = protoDescriptor.aquacis.swap.Proto.DataHubService;
DataHubService = new DataHubService('localhost:'+SIDECAR_PORT, grpc.credentials.createInsecure());
const grpc = require('grpc');
const datahub = require("@datahub/datahub-client-node");
const protoDescriptor = datahub.dynamicCodegen();
var DataHubService = protoDescriptor.aquacis.swap.Proto.DataHubService;
DataHubService = new DataHubService('localhost:'+SIDECAR_PORT, grpc.credentials.createInsecure());
Una vez con el service definido, creamos el stream para publicar datos
javascript
var writeStream = DataHubService.write();
var writeStream = DataHubService.write();
Si se necesita, se puede llamar un op=CONFIGURE
javascript
var configureParams = {
op: WriteOp.CONFIGURE,
configuration: {
properties: {
"multiple.schemas": "true"
// si se tiene que escribir mensajes
// con varios schemas en el mismo topic
}
}
}
var configureParams = {
op: WriteOp.CONFIGURE,
configuration: {
properties: {
"multiple.schemas": "true"
// si se tiene que escribir mensajes
// con varios schemas en el mismo topic
}
}
}
luego se escribe el schema y a continuación los mensajes:
javascript
var account = {
id: "uuid",
nombre: "pepe"
};
var schemaParams = {
op: WriteOp.SCHEMA,
type: "nombreSchema",
schema:avroSchema
};
var writeParams = {
op: WriteOp.WRITE,
key: ""+account.id,
type: "nombreSchema",
topic: "account",
value: JSON.stringify(account)
};
writeStream.write(schemaParams);
writeStream.write(writeParams);
writeStream.on('data', function(response) {
console.log(response);
});
writeStream.on('end', function(e) {
console.log("end");
onEnd();
});
writeStream.on('error', function(e) {
console.error(e);
onEnd();
});
var account = {
id: "uuid",
nombre: "pepe"
};
var schemaParams = {
op: WriteOp.SCHEMA,
type: "nombreSchema",
schema:avroSchema
};
var writeParams = {
op: WriteOp.WRITE,
key: ""+account.id,
type: "nombreSchema",
topic: "account",
value: JSON.stringify(account)
};
writeStream.write(schemaParams);
writeStream.write(writeParams);
writeStream.on('data', function(response) {
console.log(response);
});
writeStream.on('end', function(e) {
console.log("end");
onEnd();
});
writeStream.on('error', function(e) {
console.error(e);
onEnd();
});
Propiedades de configuración del productor
- bootstrap.servers: Esta propiedad es opcional y solo hay que usarla si no está configurada por defecto en el sidecar. La ip del kafka al que conectarse, puede ser una ip o todas las del clúster.
- schema.registry.url: Esta propiedad es opcional y solo hay que usarla si no está configurada por defecto en el sidecar. La url del schema registry dónde se guardarán los schemas, puede ser una url o varias.
- multiple.schemas: Esta propiedad es opcional y solo hay que usarla si queremos varios schemas en un mismo topics. En dicho caso, pondremos de valor: true
También se pueden añadir otras propiedades del productor Kafka
Cómo consumir mensajes
Primero tenemos que utilizar el client precompilado Node, para hacer esto se tiene que añadir al fichero .npmrc de la carpeta del proyecto, el repositorio de Azure DevOps / nautilus.
Luego se instala el client
npm install --save @datahub/datahub-client-node@2.3.19
npm install --save @datahub/datahub-client-node@2.3.19
Entonces se tiene que importar el modulo, utilizando la generación de código dinámica. También hay código estático generado e incluido en el paquete, pero si se utiliza el código estático no se puede pasar objectos Json a grpc y se tiene que utilizar las clases de modelo y los métodos setter para crear los objectos.
javascript
const grpc = require('grpc');
const datahub = require("@datahub/datahub-client-node");
const protoDescriptor = datahub.dynamicCodegen();
var DataHubService = protoDescriptor.aquacis.swap.Proto.DataHubService;
DataHubService = new DataHubService('localhost:'+SIDECAR_PORT, grpc.credentials.createInsecure());
const grpc = require('grpc');
const datahub = require("@datahub/datahub-client-node");
const protoDescriptor = datahub.dynamicCodegen();
var DataHubService = protoDescriptor.aquacis.swap.Proto.DataHubService;
DataHubService = new DataHubService('localhost:'+SIDECAR_PORT, grpc.credentials.createInsecure());
Una vez con el service definido, creamos el stream para recibir datos
javascript
var listenStream = DataHubService.listen();
var listenStream = DataHubService.listen();
Después enviamos un mensaje para configurar el stream
javascript
var configureParams = {
op: ListenOp.CONFIGURE,
configuration: {
topics: ["topic1","topic2"],
//batch: 1 // por defecto a 1
properties: {
// se pueden usar la mismas propiedades del consumer Kafka
}
}
};
listenStream.write(configureParams);
var configureParams = {
op: ListenOp.CONFIGURE,
configuration: {
topics: ["topic1","topic2"],
//batch: 1 // por defecto a 1
properties: {
// se pueden usar la mismas propiedades del consumer Kafka
}
}
};
listenStream.write(configureParams);
Entonces recibimos los updates y hacemos COMMIT o RETRY, dependiendo que el procesamiento haya ido bien o haya fallado.
Si el sidecar tarda mas de sidecar.drain.timeout millisegundos en recibir un mensaje de la cola y no ha terminado un batch, termina antes y envia un mensaje con error.code: COMMIT_NOTIFICATION.
(a partir de la 3.1.4) Si el sidecar está esperando un commit o un retry y no lo recibe en sidecar.commit.timeout millisegundos, envía otro COMMIT_NOTIFICATION y luego otro cada sidecar.commit.timeout millisegundos, hasta que reciba un commit o retry o llegue a sidecar.commit.notification.max.retries. Si llega al maximo numero de reintentos termina el consumidor y el client tiene que lanzar otro método listen.
Si se envía un commit o un retry mientras que se está ejecutando otro, el sidecar devolverá un DUPLICATED_BATCH_TERMINATION_ERROR. Si se envía un commit o un retry y el sidecar no está esperando uno de estos enviará un error de tipo UNEXPECTED_BATCH_TERMINATION_ERROR.
Si se especifica sidecar.commit.notification.always el sidecar enviará un COMMIT_NOTIFICATION cuando termina cada batch, tanto cuando termina por que llega a batch size como cuando haya un drain timeout. De esta manera, el client no tiene que contar el numero de mensajes del batch y solo los contará el sidecar. Esta modalidad no está activa por defecto ya que el batch size por defecto es 1, y está aconsejada solo si el batch size es > 1 y el overhead de tener este mensaje de commit notification es muy poco.
javascript
listenStream.on('data', function(update) {
console.log(update);
//Ejecuto procesamiento
//Si el procesamiento ha ido bien
listen.write({op: ListenOp.COMMIT});
//Si el procesamiento ha fallado en algún momento
listen.write({op: ListenOp.RETRY});
});
listenStream.on('end', function() {
console.log("stream ended", arguments);
onError(stream);
});
listenStream.on('error', function(e) {
console.error(e);
onError(stream);
});
listenStream.on('data', function(update) {
console.log(update);
//Ejecuto procesamiento
//Si el procesamiento ha ido bien
listen.write({op: ListenOp.COMMIT});
//Si el procesamiento ha fallado en algún momento
listen.write({op: ListenOp.RETRY});
});
listenStream.on('end', function() {
console.log("stream ended", arguments);
onError(stream);
});
listenStream.on('error', function(e) {
console.error(e);
onError(stream);
});
Propiedades de configuración del consumidor
topics un array de topics que se tienen que escuchar
properties propiedades del consumidor Kafka, en particular
- bootstrap.servers: Esta propiedad es opcional y solo hay que usarla si no está configurada por defecto en el sidecar. La ip del kafka al que conectarse, puede ser una ip o todas las del clúster.
- schema.registry.url: Esta propiedad es opcional y solo hay que usarla si no está configurada por defecto en el sidecar. La url del schema registry dónde se guardarán los schemas, puede ser una url o varias.
- group.id: Esta propiedad es opcional y solo hay que usarla si no está configurada por defecto en el sidecar. Nombre del ConsumerGroup que se usa para consumir. Altamente recomendable tener distintos nombres por servicio, para que cada servicio lleve su propio orden de consumición, sino cuando un servicio lo procese el otro servicio no lo hará. En cambio, un servicio con distintas instancias utilizará el mismo nombre para repartir las particiones.
- sidecar.drain.timeout: Esta propiedad es opcional y solo hay que usarla si no está configurada por defecto en el sidecar. Timeout despues de que se considera un batch como terminado si no se han recibido mensajes durante este tiempo y no se ha llegado a batch size. Se envía un COMMIT_NOTIFICATION y se espera un commit o un retry. Por defecto es 5000 ms
- sidecar.commit.timeout: Esta propiedad es opcional y solo hay que usarla si no está configurada por defecto en el sidecar. Timeout despues de que si el sidecar está esperando un commit o un retry y no lo recibe, envía otro COMMIT_NOTIFICATION. Por defecto es 1000 ms
- sidecar.commit.notification.max.retries: Esta propiedad es opcional y solo hay que usarla si no está configurada por defecto en el sidecar. Maximo numero de reintentos de commit con COMMIT_NOTIFICATION después de que se termina el bi-directional stream que coresponde a la llamada listen. Por defecto es 10
- sidecar.commit.notification.always: Esta propiedad es opcional y solo hay que usarla si no está configurada por defecto en el sidecar. Si enviar un COMMIT_NOTIFICATION por cada batch terminado, tan cuando el batch termina solo, como cuando un drain timeout envía un commit notification. Permite a los clients de no contar el numero de mensajes y que se haga solo en el sidecar. Por defecto es false
También se pueden añadir otras propiedades del consumidor Kafka
Multitenant
La funcionalidad de Multitenant permite de enviar mensajes a un topic que son específicos de un tenant y solo pueden leerlos los servicios que están autorizados para este topic y tenant.
También se puede empezar a escuchar un topic y si este topic es multitenant, se recibirán los mensajes de todos los tenants que están autorizados al usuario especificado en la configuración del Sidecar.
Cómo publicar mensajes a un topic Multitenant
Para publicar mensajes en un topic multitenant, además del topic, se tiene que especificar un tenantId cuando se escribe el mensaje WRITE
javascript
var writeParams = {
op: WriteOp.WRITE,
type: "<schemaId>",
topic: "<multitenantTopic>",
tenantId: "<tenantId>",
key: "<key>",
value: "<value>"
};
var writeParams = {
op: WriteOp.WRITE,
type: "<schemaId>",
topic: "<multitenantTopic>",
tenantId: "<tenantId>",
key: "<key>",
value: "<value>"
};
Cómo consumir mensajes de un topic Multitenant
Para consumir mensaje de un topic Multitenant solo se tiene que especificar el nombre del topic multitenant. Cuando se recibirán las actualizaciones, estan tendrán un campo tenantId con el id del tenant que ha recibido el mensaje.
javascript
var listenParams = {
op: ListenOp.CONFIGURE,
configuration: {
topics: ["account"]
}
}
...
listen.write(listenParams);
...
listen.on('data', function(update) {
...
console.log(update.tenantId);
...
});
var listenParams = {
op: ListenOp.CONFIGURE,
configuration: {
topics: ["account"]
}
}
...
listen.write(listenParams);
...
listen.on('data', function(update) {
...
console.log(update.tenantId);
...
});
Tambien se puede especificar de escuchar a topic multitenant y no multitenant al mismo tiempo o se puede especificar de escuchar solo a un subconjunto de los tenants de un topic, siempre que el tenant esté autorizado para el usuario. Para hacer esto se tiene que añadir un topic de tipo <multitenantTopic>_<tenantId> en la lista de los topic a que se le subscribe.
javascript
var listenParams = {
op: ListenOp.CONFIGURE,
configuration: {
topics: ["account_123","account_345", "not-multitenant-topic"]
}
}
var listenParams = {
op: ListenOp.CONFIGURE,
configuration: {
topics: ["account_123","account_345", "not-multitenant-topic"]
}
}
Otras consideraciones
Es importante tener en cuenta la gestión de la conexión con el sidecar, para ellos hay que hacer reintentos de conexión. En nuetro ejemplo, de eso se encarga la función onError:
javascript
var onError = (clientSocket)=>{
if(clientSocket) clientSocket.end();
setTimeout(()=>{
//Volver a conectarse al sidecar
}, 5000);
}
var onError = (clientSocket)=>{
if(clientSocket) clientSocket.end();
setTimeout(()=>{
//Volver a conectarse al sidecar
}, 5000);
}
También es importante recalcar que para node se ha creado una librería que realiza todas estas tareas de gestión. Se puede encontrar en el PlatformNodeTools.