Skip to content

Sidecar última (>=3.1.5)

Sidecar es un patrón que es muy útil en el mundo de los microservicios, ya que nos permite abstraer funcionalidad genérica de todos los servicios y además, no tener que hacer librerias en cada uno de estos lenguajes para cada funcionalidad (Más info link).

Sidecar

En nuestra implementación para kafka, lo hemos usado para abstraer a los servicios de:

  • El driver a usar para conectar a kafka. Había problemas de kafka con node y el schema registry.
  • Desplegar funcionalidad a todos los servicios, sin tener que hacer grandes cambios en los servicios. (Seguridad, multitenant, etc)
  • El funcionamiento al detalle del consumidor/publicador de kafka.
  • La gestión de Avro.

Flujos

Actualmente el Sidecar dispone de 2 flujos de funcionamiento. Y todas las comunicaciones entre el servicio y el sidecar es usando grpc.

  • Flujo para consumir datos:
    1. Se inicia con el nombre de los topics con el configure, entonces recibiremos el primer dato.
    2. Si procesamos bien el dato, mandamos la opción commit y recibiremos el siguiente mensaje.
    3. En caso de fallo al procesar se envía la opción retry y recibimos el último dato que no hemos hecho commit.

Sidecar Listen

  • Flujo para producir datos:
    1. Enviamos el schema con la llamada schema
    2. Si es necesario hacemos el configure para otros parámetros
    3. Enviamos write con el topic, la clave y los datos del mensaje.
    4. Se recibirá un mensaje con información sobre si ha ido bien o el error. Sidecar Write

Configuración con variables de entorno

Se puede arrancar el Sidecar ya con la configuración predefinida a través de variables de entorno. Si estas variables estan configuradas, sobrescriben las variables que se pasan a través del op=CONFIGURE.

  • KAFKA_BOOTSTRAP_SERVERS sobrescribe el parámetro de configuración bootstrap.servers. Ej. 10.18.179.33,10.18.179.34,10.18.179.35
  • KAFKA_LISTENERS variable donde se excplicita cuáles son los puertos por cada uno de los protocoles. Ej. PLAINTEXT://:9092,SASL_PLAINTEXT://:9093
  • KAFKA_SCHEMA_REGISTRY_URL sobrescribe el parámetro de configuración schema.registry.url. Ej. http://10.18.179.35/api/schema-registry/
  • KAFKA_GROUP_ID sobrescribe el parámetro de configuración group.id. Ej. test-1
  • KAFKA_SECURITY_PROTOCOL sobrescribe el parámetro de configuración security.protocol. Ej. PLAINTEXT o SASL_PLAINTEXT
  • KAFKA_USERNAME nombre de usuario para la autenticación SASL-SCRAM, si security.protocol es SASL_PLAINTEXT
  • KAFKA_PASSWORD contraseña para la autenticación SASL-SCRAM, si security.protocol es SASL_PLAINTEXT
  • SIDECAR_MAX_MESSAGE_SIZE tamaño en bytes del mensaje mas grande que se puede enviar y recibir de Kafka (a partir de la 2.4.21)
  • KAFKA_SSL_KEYSTORE corresponde a ssl.keystore.location (a partir de la 2.6.0)
  • KAFKA_SSL_TRUSTSTORE corresponde a ssl.truststore.location (a partir de la 2.6.0)
  • KAFKA_SSL_KEY_PASSWORD corresponde a ssl.key.password (a partir de la 2.6.0)
  • KAFKA_SSL_KEYSTORE_PASSWORD corresponde a ssl.keystore.password (a partir de la 2.6.0)
  • KAFKA_SSL_TRUSTSTORE_PASSWORD corresponde a ssl.truststore.password (a partir de la 2.6.0)
  • KAFKA_SCRAM_SASL_MECHANISM corresponde a sasl.mechanism (a partir de la 2.6.0)
  • SIDECAR_DRAIN_TIMEOUT corresponde a sidecar.drain.timeout (a partir de la 3.1.4)
  • SIDECAR_COMMIT_TIMEOUT corresponde a sidecar.commit.timeout (a partir de la 3.1.4)
  • SIDECAR_COMMIT_NOTIFICATION_MAX_RETRIES corresponde a sidecar.commit.notification.max.retries (a partir de la 3.1.4)
  • SIDECAR_ALWAYS_COMMIT_NOTIFICATION corresponde a sidecar.commit.notification.always (a partir de la 3.1.4)

Cómo ejecutar el sidecar en mi servicio

El sidecar está pensado para ser lanzado como container de un pod en Kubernetes, para la ejecución en Docker Swarm con supervisor hay que mirar la versión 2.8.4.

Un ejemplo de contenedor para un deployment Kubernetes:

yaml
        - name: sidecar
          image: nautilus.azurecr.io/nautilus_sidecar:4.2.2
          imagePullPolicy: "Always"
          command: ["/usr/sidecar/init.sh"]
          ports:
            - containerPort: 8080
            - containerPort: 8081
          env:
            - name: SIDECAR_JAVA_OPTS
              value: "-Xms128m -Xmx256m"
            - name: KAFKA_BOOTSTRAP_SERVERS
              valueFrom:
                configMapKeyRef:
                  name: iot-{{env_name}}-cfg
                  key: kafka.server
            - name: KAFKA_LISTENERS
              valueFrom:
                configMapKeyRef:
                  name: iot-{{env_name}}-cfg
                  key: kafka.listener
            - name: KAFKA_SCHEMA_REGISTRY_URL
              valueFrom:
                configMapKeyRef:
                  name: iot-{{env_name}}-cfg
                  key: kafka.schema.registry.url
            - name: KAFKA_SECURITY_PROTOCOL
              valueFrom:
                configMapKeyRef:
                  name: iot-{{env_name}}-cfg
                  key: kafka.security.protocol
            - name: JAEGER_SERVICE_NAME
              value: iot_links
            - name: JAEGER_AGENT_HOST
              valueFrom:
                fieldRef:
                  fieldPath: status.hostIP
            - name: KAFKA_GROUP_ID
              value: iot.links.1
            - name: HOST
              valueFrom:
                fieldRef:
                  fieldPath: status.podIP
            - name: KAFKA_USERNAME
              valueFrom:
                secretKeyRef:
                  name: nautilus-datahub-{{env_name}}-kafka-iot
                  key: username
            - name: KAFKA_PASSWORD
              valueFrom:
                secretKeyRef:
                  name: nautilus-datahub-{{env_name}}-kafka-iot
                  key: password
            - name: KAFKA_SSL_KEYSTORE_PASSWORD
              valueFrom:
                secretKeyRef:
                  name: nautilus-datahub-{{env_name}}-kafka-client
                  key: keystore-password
            - name: KAFKA_SSL_TRUSTSTORE_PASSWORD
              valueFrom:
                secretKeyRef:
                  name: nautilus-datahub-{{env_name}}-kafka-client
                  key: truststore-password
            - name: KAFKA_SSL_KEY_PASSWORD
              valueFrom:
                secretKeyRef:
                  name: nautilus-datahub-{{env_name}}-kafka-client
                  key: key-password
          volumeMounts:
          - name: kafka-client
            mountPath: "/usr/sidecar/keystore/kafka.client.keystore.jks"
            subPath: "kafka.client.keystore.jks"
          - name: kafka-client
            mountPath: "/usr/sidecar/keystore/kafka.client.truststore.jks"
            subPath: "kafka.client.truststore.jks"
          resources:
            limits:
              cpu: 500m
              memory: 512Mi
            requests:
              cpu: 10m
              memory: 256Mi
          readinessProbe:
            httpGet:
              path: /health
              port: 8081
            failureThreshold: 1
            initialDelaySeconds: 120
            periodSeconds: 5
            successThreshold: 1
            timeoutSeconds: 5
          livenessProbe:
            httpGet:
              path: /health
              port: 8081
            failureThreshold: 3
            initialDelaySeconds: 120
            periodSeconds: 30
            successThreshold: 1
            timeoutSeconds: 5
        - name: sidecar
          image: nautilus.azurecr.io/nautilus_sidecar:4.2.2
          imagePullPolicy: "Always"
          command: ["/usr/sidecar/init.sh"]
          ports:
            - containerPort: 8080
            - containerPort: 8081
          env:
            - name: SIDECAR_JAVA_OPTS
              value: "-Xms128m -Xmx256m"
            - name: KAFKA_BOOTSTRAP_SERVERS
              valueFrom:
                configMapKeyRef:
                  name: iot-{{env_name}}-cfg
                  key: kafka.server
            - name: KAFKA_LISTENERS
              valueFrom:
                configMapKeyRef:
                  name: iot-{{env_name}}-cfg
                  key: kafka.listener
            - name: KAFKA_SCHEMA_REGISTRY_URL
              valueFrom:
                configMapKeyRef:
                  name: iot-{{env_name}}-cfg
                  key: kafka.schema.registry.url
            - name: KAFKA_SECURITY_PROTOCOL
              valueFrom:
                configMapKeyRef:
                  name: iot-{{env_name}}-cfg
                  key: kafka.security.protocol
            - name: JAEGER_SERVICE_NAME
              value: iot_links
            - name: JAEGER_AGENT_HOST
              valueFrom:
                fieldRef:
                  fieldPath: status.hostIP
            - name: KAFKA_GROUP_ID
              value: iot.links.1
            - name: HOST
              valueFrom:
                fieldRef:
                  fieldPath: status.podIP
            - name: KAFKA_USERNAME
              valueFrom:
                secretKeyRef:
                  name: nautilus-datahub-{{env_name}}-kafka-iot
                  key: username
            - name: KAFKA_PASSWORD
              valueFrom:
                secretKeyRef:
                  name: nautilus-datahub-{{env_name}}-kafka-iot
                  key: password
            - name: KAFKA_SSL_KEYSTORE_PASSWORD
              valueFrom:
                secretKeyRef:
                  name: nautilus-datahub-{{env_name}}-kafka-client
                  key: keystore-password
            - name: KAFKA_SSL_TRUSTSTORE_PASSWORD
              valueFrom:
                secretKeyRef:
                  name: nautilus-datahub-{{env_name}}-kafka-client
                  key: truststore-password
            - name: KAFKA_SSL_KEY_PASSWORD
              valueFrom:
                secretKeyRef:
                  name: nautilus-datahub-{{env_name}}-kafka-client
                  key: key-password
          volumeMounts:
          - name: kafka-client
            mountPath: "/usr/sidecar/keystore/kafka.client.keystore.jks"
            subPath: "kafka.client.keystore.jks"
          - name: kafka-client
            mountPath: "/usr/sidecar/keystore/kafka.client.truststore.jks"
            subPath: "kafka.client.truststore.jks"
          resources:
            limits:
              cpu: 500m
              memory: 512Mi
            requests:
              cpu: 10m
              memory: 256Mi
          readinessProbe:
            httpGet:
              path: /health
              port: 8081
            failureThreshold: 1
            initialDelaySeconds: 120
            periodSeconds: 5
            successThreshold: 1
            timeoutSeconds: 5
          livenessProbe:
            httpGet:
              path: /health
              port: 8081
            failureThreshold: 3
            initialDelaySeconds: 120
            periodSeconds: 30
            successThreshold: 1
            timeoutSeconds: 5

Cómo publicar mensajes

Para poder producir mensajes

  1. se tiene que llamar como primero el método write

  2. se tiene que enviar un WriteParams con un identificador y uno schema Avro (op=SCHEMA)

  3. se puede enviar más que uno schema si se tiene que escribir diferentes tipos de mensajes

  4. se puede enviar un WriteParams con una configuración, que se va a añadir a la configuración con variables de entorno (op=CONFIGURE)

  5. se tiene que enviar un o más WriteParams con type, key (opcional), value y topic (op=WRITE)

Ejemplo:

Lee uno schema Avro

javascript
const path = require("path");
const fs = require("fs");
var AVRO_SCHEMA_PATH = path.join(__dirname,"..","avro","nombreSchema.avsc");
var avroSchema = fs.readFileSync(AVRO_SCHEMA_PATH).toString();
const path = require("path");
const fs = require("fs");
var AVRO_SCHEMA_PATH = path.join(__dirname,"..","avro","nombreSchema.avsc");
var avroSchema = fs.readFileSync(AVRO_SCHEMA_PATH).toString();

A continuación tenemos que utilizar el client precompilado Node, para hacer esto se tiene que añadir al fichero .npmrc de la carpeta del proyecto, el repositorio de Azure DevOps / nautilus.

Luego se instala el client

npm install --save @datahub/datahub-client-node@2.3.19
npm install --save @datahub/datahub-client-node@2.3.19

Entonces se tiene que importar el modulo, utilizando la generación de código dinámica. También hay código estático generado e incluido en el paquete, pero si se utiliza el código estático no se puede pasar objectos Json a grpc y se tiene que utilizar las clases de modelo y los métodos setter para crear los objectos.

javascript
const grpc = require('grpc');
const datahub = require("@datahub/datahub-client-node");
const protoDescriptor = datahub.dynamicCodegen();

var DataHubService = protoDescriptor.aquacis.swap.Proto.DataHubService;

DataHubService = new DataHubService('localhost:'+SIDECAR_PORT, grpc.credentials.createInsecure());
const grpc = require('grpc');
const datahub = require("@datahub/datahub-client-node");
const protoDescriptor = datahub.dynamicCodegen();

var DataHubService = protoDescriptor.aquacis.swap.Proto.DataHubService;

DataHubService = new DataHubService('localhost:'+SIDECAR_PORT, grpc.credentials.createInsecure());

Una vez con el service definido, creamos el stream para publicar datos

javascript
var writeStream = DataHubService.write();
var writeStream = DataHubService.write();

Si se necesita, se puede llamar un op=CONFIGURE

javascript
var configureParams = {
        op: WriteOp.CONFIGURE,
        configuration: {
            properties: {
                "multiple.schemas": "true" 
                // si se tiene que escribir mensajes 
                // con varios schemas en el mismo topic
            }
        }
}
var configureParams = {
        op: WriteOp.CONFIGURE,
        configuration: {
            properties: {
                "multiple.schemas": "true" 
                // si se tiene que escribir mensajes 
                // con varios schemas en el mismo topic
            }
        }
}

luego se escribe el schema y a continuación los mensajes:

javascript
var account = {
        id: "uuid",
        nombre: "pepe"
};


var schemaParams = {
        op: WriteOp.SCHEMA,
        type: "nombreSchema",
        schema:avroSchema
};

var writeParams = {
        op: WriteOp.WRITE,
        key: ""+account.id,
        type: "nombreSchema",
        topic: "account",
        value: JSON.stringify(account)
};

writeStream.write(schemaParams);

writeStream.write(writeParams);

writeStream.on('data', function(response) {
    console.log(response);
});
writeStream.on('end', function(e) {
    console.log("end");
    onEnd();
});
writeStream.on('error', function(e) {
    console.error(e);
    onEnd();
});
var account = {
        id: "uuid",
        nombre: "pepe"
};


var schemaParams = {
        op: WriteOp.SCHEMA,
        type: "nombreSchema",
        schema:avroSchema
};

var writeParams = {
        op: WriteOp.WRITE,
        key: ""+account.id,
        type: "nombreSchema",
        topic: "account",
        value: JSON.stringify(account)
};

writeStream.write(schemaParams);

writeStream.write(writeParams);

writeStream.on('data', function(response) {
    console.log(response);
});
writeStream.on('end', function(e) {
    console.log("end");
    onEnd();
});
writeStream.on('error', function(e) {
    console.error(e);
    onEnd();
});

Propiedades de configuración del productor

  • bootstrap.servers: Esta propiedad es opcional y solo hay que usarla si no está configurada por defecto en el sidecar. La ip del kafka al que conectarse, puede ser una ip o todas las del clúster.
  • schema.registry.url: Esta propiedad es opcional y solo hay que usarla si no está configurada por defecto en el sidecar. La url del schema registry dónde se guardarán los schemas, puede ser una url o varias.
  • multiple.schemas: Esta propiedad es opcional y solo hay que usarla si queremos varios schemas en un mismo topics. En dicho caso, pondremos de valor: true

También se pueden añadir otras propiedades del productor Kafka

Cómo consumir mensajes

Primero tenemos que utilizar el client precompilado Node, para hacer esto se tiene que añadir al fichero .npmrc de la carpeta del proyecto, el repositorio de Azure DevOps / nautilus.

Luego se instala el client

npm install --save @datahub/datahub-client-node@2.3.19
npm install --save @datahub/datahub-client-node@2.3.19

Entonces se tiene que importar el modulo, utilizando la generación de código dinámica. También hay código estático generado e incluido en el paquete, pero si se utiliza el código estático no se puede pasar objectos Json a grpc y se tiene que utilizar las clases de modelo y los métodos setter para crear los objectos.

javascript
const grpc = require('grpc');
const datahub = require("@datahub/datahub-client-node");
const protoDescriptor = datahub.dynamicCodegen();

var DataHubService = protoDescriptor.aquacis.swap.Proto.DataHubService;

DataHubService = new DataHubService('localhost:'+SIDECAR_PORT, grpc.credentials.createInsecure());
const grpc = require('grpc');
const datahub = require("@datahub/datahub-client-node");
const protoDescriptor = datahub.dynamicCodegen();

var DataHubService = protoDescriptor.aquacis.swap.Proto.DataHubService;

DataHubService = new DataHubService('localhost:'+SIDECAR_PORT, grpc.credentials.createInsecure());

Una vez con el service definido, creamos el stream para recibir datos

javascript
var listenStream = DataHubService.listen();
var listenStream = DataHubService.listen();

Después enviamos un mensaje para configurar el stream

javascript
var configureParams = {
    op: ListenOp.CONFIGURE,
    configuration: {
        topics: ["topic1","topic2"],
        //batch: 1 // por defecto a 1
        properties: {
            // se pueden usar la mismas propiedades del consumer Kafka
        }
    }
};

listenStream.write(configureParams);
var configureParams = {
    op: ListenOp.CONFIGURE,
    configuration: {
        topics: ["topic1","topic2"],
        //batch: 1 // por defecto a 1
        properties: {
            // se pueden usar la mismas propiedades del consumer Kafka
        }
    }
};

listenStream.write(configureParams);

Entonces recibimos los updates y hacemos COMMIT o RETRY, dependiendo que el procesamiento haya ido bien o haya fallado.

Si el sidecar tarda mas de sidecar.drain.timeout millisegundos en recibir un mensaje de la cola y no ha terminado un batch, termina antes y envia un mensaje con error.code: COMMIT_NOTIFICATION.

(a partir de la 3.1.4) Si el sidecar está esperando un commit o un retry y no lo recibe en sidecar.commit.timeout millisegundos, envía otro COMMIT_NOTIFICATION y luego otro cada sidecar.commit.timeout millisegundos, hasta que reciba un commit o retry o llegue a sidecar.commit.notification.max.retries. Si llega al maximo numero de reintentos termina el consumidor y el client tiene que lanzar otro método listen.

Si se envía un commit o un retry mientras que se está ejecutando otro, el sidecar devolverá un DUPLICATED_BATCH_TERMINATION_ERROR. Si se envía un commit o un retry y el sidecar no está esperando uno de estos enviará un error de tipo UNEXPECTED_BATCH_TERMINATION_ERROR.

Si se especifica sidecar.commit.notification.always el sidecar enviará un COMMIT_NOTIFICATION cuando termina cada batch, tanto cuando termina por que llega a batch size como cuando haya un drain timeout. De esta manera, el client no tiene que contar el numero de mensajes del batch y solo los contará el sidecar. Esta modalidad no está activa por defecto ya que el batch size por defecto es 1, y está aconsejada solo si el batch size es > 1 y el overhead de tener este mensaje de commit notification es muy poco.

javascript
listenStream.on('data', function(update) {
    console.log(update);
    //Ejecuto procesamiento
    //Si el procesamiento ha ido bien
    listen.write({op: ListenOp.COMMIT});
    //Si el procesamiento ha fallado en algún momento
    listen.write({op: ListenOp.RETRY}); 
});

listenStream.on('end', function() {
    console.log("stream ended", arguments);
    onError(stream);
});
listenStream.on('error', function(e) {
    console.error(e);
    onError(stream);
});
listenStream.on('data', function(update) {
    console.log(update);
    //Ejecuto procesamiento
    //Si el procesamiento ha ido bien
    listen.write({op: ListenOp.COMMIT});
    //Si el procesamiento ha fallado en algún momento
    listen.write({op: ListenOp.RETRY}); 
});

listenStream.on('end', function() {
    console.log("stream ended", arguments);
    onError(stream);
});
listenStream.on('error', function(e) {
    console.error(e);
    onError(stream);
});

Propiedades de configuración del consumidor

  • topics un array de topics que se tienen que escuchar

  • properties propiedades del consumidor Kafka, en particular

    • bootstrap.servers: Esta propiedad es opcional y solo hay que usarla si no está configurada por defecto en el sidecar. La ip del kafka al que conectarse, puede ser una ip o todas las del clúster.
    • schema.registry.url: Esta propiedad es opcional y solo hay que usarla si no está configurada por defecto en el sidecar. La url del schema registry dónde se guardarán los schemas, puede ser una url o varias.
    • group.id: Esta propiedad es opcional y solo hay que usarla si no está configurada por defecto en el sidecar. Nombre del ConsumerGroup que se usa para consumir. Altamente recomendable tener distintos nombres por servicio, para que cada servicio lleve su propio orden de consumición, sino cuando un servicio lo procese el otro servicio no lo hará. En cambio, un servicio con distintas instancias utilizará el mismo nombre para repartir las particiones.
    • sidecar.drain.timeout: Esta propiedad es opcional y solo hay que usarla si no está configurada por defecto en el sidecar. Timeout despues de que se considera un batch como terminado si no se han recibido mensajes durante este tiempo y no se ha llegado a batch size. Se envía un COMMIT_NOTIFICATION y se espera un commit o un retry. Por defecto es 5000 ms
    • sidecar.commit.timeout: Esta propiedad es opcional y solo hay que usarla si no está configurada por defecto en el sidecar. Timeout despues de que si el sidecar está esperando un commit o un retry y no lo recibe, envía otro COMMIT_NOTIFICATION. Por defecto es 1000 ms
    • sidecar.commit.notification.max.retries: Esta propiedad es opcional y solo hay que usarla si no está configurada por defecto en el sidecar. Maximo numero de reintentos de commit con COMMIT_NOTIFICATION después de que se termina el bi-directional stream que coresponde a la llamada listen. Por defecto es 10
    • sidecar.commit.notification.always: Esta propiedad es opcional y solo hay que usarla si no está configurada por defecto en el sidecar. Si enviar un COMMIT_NOTIFICATION por cada batch terminado, tan cuando el batch termina solo, como cuando un drain timeout envía un commit notification. Permite a los clients de no contar el numero de mensajes y que se haga solo en el sidecar. Por defecto es false

    También se pueden añadir otras propiedades del consumidor Kafka

Multitenant

La funcionalidad de Multitenant permite de enviar mensajes a un topic que son específicos de un tenant y solo pueden leerlos los servicios que están autorizados para este topic y tenant.

También se puede empezar a escuchar un topic y si este topic es multitenant, se recibirán los mensajes de todos los tenants que están autorizados al usuario especificado en la configuración del Sidecar.

Cómo publicar mensajes a un topic Multitenant

Para publicar mensajes en un topic multitenant, además del topic, se tiene que especificar un tenantId cuando se escribe el mensaje WRITE

javascript
var writeParams = {
        op: WriteOp.WRITE,
        type: "<schemaId>",
        topic: "<multitenantTopic>",
        tenantId: "<tenantId>",
        key: "<key>",
        value: "<value>"
};
var writeParams = {
        op: WriteOp.WRITE,
        type: "<schemaId>",
        topic: "<multitenantTopic>",
        tenantId: "<tenantId>",
        key: "<key>",
        value: "<value>"
};

Cómo consumir mensajes de un topic Multitenant

Para consumir mensaje de un topic Multitenant solo se tiene que especificar el nombre del topic multitenant. Cuando se recibirán las actualizaciones, estan tendrán un campo tenantId con el id del tenant que ha recibido el mensaje.

javascript
var listenParams = {
    op: ListenOp.CONFIGURE,
    configuration: {
        topics: ["account"]
    }
}

...

listen.write(listenParams);

...
listen.on('data', function(update) {
    ...
    console.log(update.tenantId);
    ...
});
var listenParams = {
    op: ListenOp.CONFIGURE,
    configuration: {
        topics: ["account"]
    }
}

...

listen.write(listenParams);

...
listen.on('data', function(update) {
    ...
    console.log(update.tenantId);
    ...
});

Tambien se puede especificar de escuchar a topic multitenant y no multitenant al mismo tiempo o se puede especificar de escuchar solo a un subconjunto de los tenants de un topic, siempre que el tenant esté autorizado para el usuario. Para hacer esto se tiene que añadir un topic de tipo <multitenantTopic>_<tenantId> en la lista de los topic a que se le subscribe.

javascript
var listenParams = {
    op: ListenOp.CONFIGURE,
    configuration: {
        topics: ["account_123","account_345", "not-multitenant-topic"]
    }
}
var listenParams = {
    op: ListenOp.CONFIGURE,
    configuration: {
        topics: ["account_123","account_345", "not-multitenant-topic"]
    }
}

Otras consideraciones

Es importante tener en cuenta la gestión de la conexión con el sidecar, para ellos hay que hacer reintentos de conexión. En nuetro ejemplo, de eso se encarga la función onError:

javascript
var onError = (clientSocket)=>{
    if(clientSocket) clientSocket.end();
    setTimeout(()=>{
        //Volver a conectarse al sidecar
    }, 5000);
}
var onError = (clientSocket)=>{
    if(clientSocket) clientSocket.end();
    setTimeout(()=>{
        //Volver a conectarse al sidecar
    }, 5000);
}

También es importante recalcar que para node se ha creado una librería que realiza todas estas tareas de gestión. Se puede encontrar en el PlatformNodeTools.