Skip to content

Ejemplos de conexión a Kafka

Introducción

Para usar otras tecnologías consulta la documentación de Confluent.

Node.js

Aquí puedes descargar el proyecto completo.

js
const { Kafka } = require("kafkajs");
const {
    SchemaRegistry,
    SchemaType,
} = require("@kafkajs/confluent-schema-registry");

// Esquema Avro, para serialización y deserialización de mensajes.
// Indica el formato del mensaje.
const schema = `
  {
    "type": "record",
    "name": "RandomTest",
    "namespace": "examples",
    "fields": [{ "type": "string", "name": "fullName" }]
  }
`;

// Credenciales de ejemplo.
// Cámbialas por el usuario y contraseña que te ha proporcionado Soporte.
const username = "username";
const password = "password";
const topic = "test-topic";

// Conexión al registro de esquemas. Se utiliza para guardar
// y recuperar todos los esquemas generados.
// La conexión está securizada. Utiliza las credenciales proporcionadas.
const registry = new SchemaRegistry({
    host: "https://schema-registry-security.nautilus.app",
    auth: {
        username,
        password,
    },
});

// Conexión con el Kafka del Datahub.
// Es obligatorio especificar los brokers con su puerto,
// protocolo SSL con el certificado previamente descargado y
// SASL con las credenciales proporcionadas.
const kafka = new Kafka({
    // Urls para los servidores de Kafka.
    brokers: [
        "public-broker1.nautilus.app:8096",
        "public-broker2.nautilus.app:8096",
        "public-broker3.nautilus.app:8096",
        "public-broker4.nautilus.app:8096",
        "public-broker5.nautilus.app:8096",
    ],
    // Llamadas securizadas. Es obligatorio usar el certificado previamente descargado.
    ssl: {
        rejectUnauthorized: true
    },
    // Autenticación para Kafka, con el método de hash SHA512.
    sasl: {
        mechanism: "scram-sha-512",
        username,
        password,
    },
});

// Código de ejecución principal.
(async () => {
    // Se registra el esquema Avro para poder producir mensajes serializados.
    // Este paso no es necesario si solo queremos consumir mensajes.
    const { id: schemaId } = await registry.register(
        {
            type: SchemaType.AVRO,
            schema
        },
        { subject: topic }
    );
    // Se crean los clientes para Kafka, un productor y un consumidor.
    const producer = kafka.producer();
    // Para consumir es recomendable crear un grupo de consumidores con un identificador único.
    const consumer = kafka.consumer({
        groupId: "f3dd7f9f-1b3c-4e47-a96c-f8018a45d175",
    });

    // En este ejemplo solo se produce un mensaje y después cerramos la conexión
    // ya que no produciremos ningún mensaje más.
    // El topic que se usa es un nombre de prueba.
    await producer.connect();
    await producer.send({
        topic,
        messages: [
            // Los datos del mensaje se serializan mediante el esquema generado anteriormente.
            {
                value: await registry.encode(schemaId, {
                    type: "Whatever",
                    fullName: "Just a Name",
                }),
            },
        ],
    });
    await producer.disconnect();

    // El consumidor se conecta y empieza a consumir del topic desde el inicio.
    // Consumirá todos los mensajes que sean producidos en el topic.
    await consumer.connect();
    await consumer.subscribe({ topic, fromBeginning: true });
    await consumer.run({
        // Función que se ejecuta por cada mensaje que es leido por el consumidor.
        eachMessage: async ({ topic, partition, message }) =>
            console.log(topic, partition, {
                // Deserializamos el mensaje para obtener su valor real.
                value: await registry.decode(message.value),
            }),
    });
})();
const { Kafka } = require("kafkajs");
const {
    SchemaRegistry,
    SchemaType,
} = require("@kafkajs/confluent-schema-registry");

// Esquema Avro, para serialización y deserialización de mensajes.
// Indica el formato del mensaje.
const schema = `
  {
    "type": "record",
    "name": "RandomTest",
    "namespace": "examples",
    "fields": [{ "type": "string", "name": "fullName" }]
  }
`;

// Credenciales de ejemplo.
// Cámbialas por el usuario y contraseña que te ha proporcionado Soporte.
const username = "username";
const password = "password";
const topic = "test-topic";

// Conexión al registro de esquemas. Se utiliza para guardar
// y recuperar todos los esquemas generados.
// La conexión está securizada. Utiliza las credenciales proporcionadas.
const registry = new SchemaRegistry({
    host: "https://schema-registry-security.nautilus.app",
    auth: {
        username,
        password,
    },
});

// Conexión con el Kafka del Datahub.
// Es obligatorio especificar los brokers con su puerto,
// protocolo SSL con el certificado previamente descargado y
// SASL con las credenciales proporcionadas.
const kafka = new Kafka({
    // Urls para los servidores de Kafka.
    brokers: [
        "public-broker1.nautilus.app:8096",
        "public-broker2.nautilus.app:8096",
        "public-broker3.nautilus.app:8096",
        "public-broker4.nautilus.app:8096",
        "public-broker5.nautilus.app:8096",
    ],
    // Llamadas securizadas. Es obligatorio usar el certificado previamente descargado.
    ssl: {
        rejectUnauthorized: true
    },
    // Autenticación para Kafka, con el método de hash SHA512.
    sasl: {
        mechanism: "scram-sha-512",
        username,
        password,
    },
});

// Código de ejecución principal.
(async () => {
    // Se registra el esquema Avro para poder producir mensajes serializados.
    // Este paso no es necesario si solo queremos consumir mensajes.
    const { id: schemaId } = await registry.register(
        {
            type: SchemaType.AVRO,
            schema
        },
        { subject: topic }
    );
    // Se crean los clientes para Kafka, un productor y un consumidor.
    const producer = kafka.producer();
    // Para consumir es recomendable crear un grupo de consumidores con un identificador único.
    const consumer = kafka.consumer({
        groupId: "f3dd7f9f-1b3c-4e47-a96c-f8018a45d175",
    });

    // En este ejemplo solo se produce un mensaje y después cerramos la conexión
    // ya que no produciremos ningún mensaje más.
    // El topic que se usa es un nombre de prueba.
    await producer.connect();
    await producer.send({
        topic,
        messages: [
            // Los datos del mensaje se serializan mediante el esquema generado anteriormente.
            {
                value: await registry.encode(schemaId, {
                    type: "Whatever",
                    fullName: "Just a Name",
                }),
            },
        ],
    });
    await producer.disconnect();

    // El consumidor se conecta y empieza a consumir del topic desde el inicio.
    // Consumirá todos los mensajes que sean producidos en el topic.
    await consumer.connect();
    await consumer.subscribe({ topic, fromBeginning: true });
    await consumer.run({
        // Función que se ejecuta por cada mensaje que es leido por el consumidor.
        eachMessage: async ({ topic, partition, message }) =>
            console.log(topic, partition, {
                // Deserializamos el mensaje para obtener su valor real.
                value: await registry.decode(message.value),
            }),
    });
})();

Python

Aquí puedes descargar el proyecto completo.

Consumidor

py
from confluent_kafka import DeserializingConsumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import StringDeserializer

def set_schema_subject(serializationContext, str):
    return 'test-topic'


# Credenciales de ejemplo.
# Cámbialas por el usuario y contraseña que te ha proporcionado Soporte.
username = "username"
password = "password"

# Conexión al registro de esquemas. Se utiliza para guardar y recuperar todos los esquemas generados.
# La conexión está securizada. Utiliza las credenciales proporcionadas.

schema_registry_url = 'https://schema-registry-security.pre.nautilus.app'
schema_registry_client = SchemaRegistryClient({'url': schema_registry_url,
                                               'basic.auth.user.info': f'{username}:{password}'})
avro_deserializer = AvroDeserializer(schema_registry_client, {'subject.name.strategy': set_schema_subject})
string_deserializer = StringDeserializer('utf_8')

brokers = 'public-broker1.nautilus.app:8096,public-broker2.nautilus.app:8096,public-broker3.nautilus.app:8096'

# Para consumir es recomendable crear un grupo de consumidores con un identificador único.
groupId = 'test_902c4be8-27fa-4f2d-9b13-e7d0l13f2a3a'

# El nombre del topic del que vamos a consumir, su pueden indicar varios dentro de la array
topics = ['topic.name']

consumer_conf = {
    'bootstrap.servers': brokers,
    'group.id': groupId,
    'auto.offset.reset': 'earliest',
    'sasl.username': username,
    'sasl.password': password,
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'SCRAM-SHA-512',
    'key.deserializer': string_deserializer,
    'value.deserializer': avro_deserializer
}

consumer = DeserializingConsumer(consumer_conf)

consumer.subscribe(topics)

while True:
    try:
        msg = consumer.poll(120)
        if msg is None:
            continue
        if msg.value() is not None:
            print(msg.key(), msg.value())
    except KeyboardInterrupt:
        break

consumer.close()
from confluent_kafka import DeserializingConsumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import StringDeserializer

def set_schema_subject(serializationContext, str):
    return 'test-topic'


# Credenciales de ejemplo.
# Cámbialas por el usuario y contraseña que te ha proporcionado Soporte.
username = "username"
password = "password"

# Conexión al registro de esquemas. Se utiliza para guardar y recuperar todos los esquemas generados.
# La conexión está securizada. Utiliza las credenciales proporcionadas.

schema_registry_url = 'https://schema-registry-security.pre.nautilus.app'
schema_registry_client = SchemaRegistryClient({'url': schema_registry_url,
                                               'basic.auth.user.info': f'{username}:{password}'})
avro_deserializer = AvroDeserializer(schema_registry_client, {'subject.name.strategy': set_schema_subject})
string_deserializer = StringDeserializer('utf_8')

brokers = 'public-broker1.nautilus.app:8096,public-broker2.nautilus.app:8096,public-broker3.nautilus.app:8096'

# Para consumir es recomendable crear un grupo de consumidores con un identificador único.
groupId = 'test_902c4be8-27fa-4f2d-9b13-e7d0l13f2a3a'

# El nombre del topic del que vamos a consumir, su pueden indicar varios dentro de la array
topics = ['topic.name']

consumer_conf = {
    'bootstrap.servers': brokers,
    'group.id': groupId,
    'auto.offset.reset': 'earliest',
    'sasl.username': username,
    'sasl.password': password,
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'SCRAM-SHA-512',
    'key.deserializer': string_deserializer,
    'value.deserializer': avro_deserializer
}

consumer = DeserializingConsumer(consumer_conf)

consumer.subscribe(topics)

while True:
    try:
        msg = consumer.poll(120)
        if msg is None:
            continue
        if msg.value() is not None:
            print(msg.key(), msg.value())
    except KeyboardInterrupt:
        break

consumer.close()

Productor

py
from uuid import uuid4

from confluent_kafka import SerializingProducer
from confluent_kafka.serialization import StringSerializer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer


class Example(object):
    def __init__(self, name, age, surname):
        self.name = name
        self.age = age
        self.surname = surname

def user_to_dict(user, ctx):
    return dict(name=user.name,
                age=user.age,
                surname=user.surname)


def delivery_report(err, msg):
    if err is not None:
        print("Delivery failed for User record {}: {}".format(msg.key(), err))
        return
    print('User record {} successfully produced to {} [{}] at offset {}'.format(
        msg.key(), msg.topic(), msg.partition(), msg.offset()))

def set_schema_subject(serializationContext, str):
    return 'test-topic'

def main():
    topic_to_produce = 'test-topic'
    # Esquema Avro, para serialización y deserialización de mensajes.
    # Indica el formato del mensaje.
    schema_str = """
    {
        "namespace": "example.nautilus.app",
        "name": "Example",
        "type": "record",
        "fields": [
            {"name": "name", "type": "string", "default": "name"},
            {"name": "age", "type": "int", "default": 10},
            {"name": "surname", "type": "string", "default": "surname"}
        ]
    }
    """
    # Credenciales de ejemplo.
    # Cámbialas por el usuario y contraseña que te ha proporcionado Soporte.
    username = "username"
    password = "password"

    # Conexión al registro de esquemas. Se utiliza para guardar y recuperar todos los esquemas generados.
    # La conexión está securizada. Utiliza las credenciales proporcionadas.

    schema_registry_url = 'https://schema-registry-security.nautilus.app'
    schema_registry_client = SchemaRegistryClient({'url': schema_registry_url,
                                                   'basic.auth.user.info': f'{username}:{password}'})

    avro_serializer = AvroSerializer(schema_registry_client, schema_str,
                                     user_to_dict, {'subject.name.strategy': set_schema_subject})

    brokers = 'public-broker1.nautilus.app:8096,public-broker2.nautilus.app:8096,public-broker3.nautilus.app:8096'

    producer_conf = {
        'bootstrap.servers': brokers,
        'sasl.username': username,
        'sasl.password': password,
        'security.protocol': 'SASL_SSL',
        'sasl.mechanism': 'SCRAM-SHA-512',
        'key.serializer': StringSerializer('utf_8'),
        'value.serializer': avro_serializer
    }

    producer = SerializingProducer(producer_conf)

    value_to_produce = Example(name="Example", age=24, surname="Surname")
    number_of_messages = 3
    while number_of_messages > 0:
        producer.poll(0.0)
        try:
            producer.produce(topic=topic_to_produce, key=str(uuid4()), value=value_to_produce,
                             on_delivery=delivery_report)
            number_of_messages -= 1
        except KeyboardInterrupt:
            break
        except ValueError:
            print("Invalid input, discarding record...")
            continue

    print("\nFlushing records...")
    producer.flush()


if __name__ == '__main__':
    main()
from uuid import uuid4

from confluent_kafka import SerializingProducer
from confluent_kafka.serialization import StringSerializer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer


class Example(object):
    def __init__(self, name, age, surname):
        self.name = name
        self.age = age
        self.surname = surname

def user_to_dict(user, ctx):
    return dict(name=user.name,
                age=user.age,
                surname=user.surname)


def delivery_report(err, msg):
    if err is not None:
        print("Delivery failed for User record {}: {}".format(msg.key(), err))
        return
    print('User record {} successfully produced to {} [{}] at offset {}'.format(
        msg.key(), msg.topic(), msg.partition(), msg.offset()))

def set_schema_subject(serializationContext, str):
    return 'test-topic'

def main():
    topic_to_produce = 'test-topic'
    # Esquema Avro, para serialización y deserialización de mensajes.
    # Indica el formato del mensaje.
    schema_str = """
    {
        "namespace": "example.nautilus.app",
        "name": "Example",
        "type": "record",
        "fields": [
            {"name": "name", "type": "string", "default": "name"},
            {"name": "age", "type": "int", "default": 10},
            {"name": "surname", "type": "string", "default": "surname"}
        ]
    }
    """
    # Credenciales de ejemplo.
    # Cámbialas por el usuario y contraseña que te ha proporcionado Soporte.
    username = "username"
    password = "password"

    # Conexión al registro de esquemas. Se utiliza para guardar y recuperar todos los esquemas generados.
    # La conexión está securizada. Utiliza las credenciales proporcionadas.

    schema_registry_url = 'https://schema-registry-security.nautilus.app'
    schema_registry_client = SchemaRegistryClient({'url': schema_registry_url,
                                                   'basic.auth.user.info': f'{username}:{password}'})

    avro_serializer = AvroSerializer(schema_registry_client, schema_str,
                                     user_to_dict, {'subject.name.strategy': set_schema_subject})

    brokers = 'public-broker1.nautilus.app:8096,public-broker2.nautilus.app:8096,public-broker3.nautilus.app:8096'

    producer_conf = {
        'bootstrap.servers': brokers,
        'sasl.username': username,
        'sasl.password': password,
        'security.protocol': 'SASL_SSL',
        'sasl.mechanism': 'SCRAM-SHA-512',
        'key.serializer': StringSerializer('utf_8'),
        'value.serializer': avro_serializer
    }

    producer = SerializingProducer(producer_conf)

    value_to_produce = Example(name="Example", age=24, surname="Surname")
    number_of_messages = 3
    while number_of_messages > 0:
        producer.poll(0.0)
        try:
            producer.produce(topic=topic_to_produce, key=str(uuid4()), value=value_to_produce,
                             on_delivery=delivery_report)
            number_of_messages -= 1
        except KeyboardInterrupt:
            break
        except ValueError:
            print("Invalid input, discarding record...")
            continue

    print("\nFlushing records...")
    producer.flush()


if __name__ == '__main__':
    main()

Java

Aquí puedes descargar el proyecto completo.

Consumidor

java
package kafka.example;

import kafka.example.config.SoftKafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;


import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class Consumer {

    public static void startConsumer() {
        Properties properties = new Properties();

        // Credenciales de ejemplo.
        // Cámbialas por el usuario y contraseña que te ha proporcionado Soporte.
        String groupId = "test_change_me1";
        String username = "username";
        String password = "password";
        String topicName = "test-topic";
        String brokers = "public-broker1.nautilus.app:8096,public-broker2.nautilus.app:8096,public-broker3.nautilus.app:8096";

        properties.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(Utils.SASL_CREDENTIALS, username, password));

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        properties.put("basic.auth.credentials.source", "USER_INFO");
        properties.put("schema.registry.basic.auth.user.info", username + ":" + password);
        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
        properties.put("group.id", groupId);
        properties.put("auto.commit.enable", "false");
        properties.put("auto.offset.reset", "earliest");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "20000");

        // avro part (deserializer)
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer", SoftKafkaAvroDeserializer.class.getName());
        properties.setProperty("schema.registry.url", "https://schema-registry-security.dev.nautilus.app");
        properties.setProperty("specific.avro.reader", "false");

        KafkaConsumer<String, GenericRecord> kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.subscribe(Collections.singleton(topicName));

        System.out.println("Waiting for data on topic: " + topicName + "...");
        try{
            while (true){
                System.out.println("Polling");
                ConsumerRecords<String, GenericRecord> records = kafkaConsumer.poll(Duration.ofMillis(20000));
                for (ConsumerRecord<String, GenericRecord> record : records){
                    System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
                }
                kafkaConsumer.commitSync();
            }
        }catch (Exception e){
            System.out.printf("Error consuming message: %s \n", e.getMessage());
        }finally {
            kafkaConsumer.close();
        }
    }
}
package kafka.example;

import kafka.example.config.SoftKafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;


import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class Consumer {

    public static void startConsumer() {
        Properties properties = new Properties();

        // Credenciales de ejemplo.
        // Cámbialas por el usuario y contraseña que te ha proporcionado Soporte.
        String groupId = "test_change_me1";
        String username = "username";
        String password = "password";
        String topicName = "test-topic";
        String brokers = "public-broker1.nautilus.app:8096,public-broker2.nautilus.app:8096,public-broker3.nautilus.app:8096";

        properties.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(Utils.SASL_CREDENTIALS, username, password));

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        properties.put("basic.auth.credentials.source", "USER_INFO");
        properties.put("schema.registry.basic.auth.user.info", username + ":" + password);
        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
        properties.put("group.id", groupId);
        properties.put("auto.commit.enable", "false");
        properties.put("auto.offset.reset", "earliest");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "20000");

        // avro part (deserializer)
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer", SoftKafkaAvroDeserializer.class.getName());
        properties.setProperty("schema.registry.url", "https://schema-registry-security.dev.nautilus.app");
        properties.setProperty("specific.avro.reader", "false");

        KafkaConsumer<String, GenericRecord> kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.subscribe(Collections.singleton(topicName));

        System.out.println("Waiting for data on topic: " + topicName + "...");
        try{
            while (true){
                System.out.println("Polling");
                ConsumerRecords<String, GenericRecord> records = kafkaConsumer.poll(Duration.ofMillis(20000));
                for (ConsumerRecord<String, GenericRecord> record : records){
                    System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
                }
                kafkaConsumer.commitSync();
            }
        }catch (Exception e){
            System.out.printf("Error consuming message: %s \n", e.getMessage());
        }finally {
            kafkaConsumer.close();
        }
    }
}

Productor

java
package kafka.example;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import kafka.example.config.SoftKafkaAvroSerializer;

import java.util.Properties;

public class Producer {

    public static void startProducer() {
        Properties props = new Properties();

        String username = "username";
        String password = "password";
        String topicName = "test-topic";
        String brokers = "public-broker1.nautilus.app:8096,public-broker2.nautilus.app:8096,public-broker3.nautilus.app:8096";

        props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(Utils.SASL_CREDENTIALS, username, password));

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                org.apache.kafka.common.serialization.StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SoftKafkaAvroSerializer.class.getName());
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
        props.put("schema.registry.url", "https://schema-registry-security.dev.nautilus.app");
        props.put("basic.auth.credentials.source", "USER_INFO");
        props.put("schema.registry.basic.auth.user.info", username + ":" + password);

        KafkaProducer producer = new KafkaProducer(props);
        String key = null; // Si no es necesrio se puede poner a null
        // Esquema Avro, para serialización y deserialización de mensajes.
        // Indica el formato del mensaje, esta en formato json escape
        String userSchema = "{\"type\":\"record\",\"name\":\"test\",\"namespace\":\"nautilus.test\",\"doc\":\"Test event schema\",\"fields\":[{\"name\":\"country\",\"type\":\"string\"},{\"name\":\"city\",\"type\":[\"null\",\"string\"],\"default\":null}]}";
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(userSchema);
        GenericRecord avroRecord = new GenericData.Record(schema);
        avroRecord.put("country", "France");
        avroRecord.put("city", "Paris");

        ProducerRecord<Object, Object> record = new ProducerRecord<>(topicName, key, avroRecord);
        try {
            producer.send(record);
            System.out.println("Message produced");
            System.out.println(record);
        } catch(Exception e) {
            System.out.printf("Error producing message: %s \n", e.getMessage());
        }
        finally {
            producer.flush();
            producer.close();
        }
    }
}
package kafka.example;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import kafka.example.config.SoftKafkaAvroSerializer;

import java.util.Properties;

public class Producer {

    public static void startProducer() {
        Properties props = new Properties();

        String username = "username";
        String password = "password";
        String topicName = "test-topic";
        String brokers = "public-broker1.nautilus.app:8096,public-broker2.nautilus.app:8096,public-broker3.nautilus.app:8096";

        props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(Utils.SASL_CREDENTIALS, username, password));

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                org.apache.kafka.common.serialization.StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SoftKafkaAvroSerializer.class.getName());
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
        props.put("schema.registry.url", "https://schema-registry-security.dev.nautilus.app");
        props.put("basic.auth.credentials.source", "USER_INFO");
        props.put("schema.registry.basic.auth.user.info", username + ":" + password);

        KafkaProducer producer = new KafkaProducer(props);
        String key = null; // Si no es necesrio se puede poner a null
        // Esquema Avro, para serialización y deserialización de mensajes.
        // Indica el formato del mensaje, esta en formato json escape
        String userSchema = "{\"type\":\"record\",\"name\":\"test\",\"namespace\":\"nautilus.test\",\"doc\":\"Test event schema\",\"fields\":[{\"name\":\"country\",\"type\":\"string\"},{\"name\":\"city\",\"type\":[\"null\",\"string\"],\"default\":null}]}";
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(userSchema);
        GenericRecord avroRecord = new GenericData.Record(schema);
        avroRecord.put("country", "France");
        avroRecord.put("city", "Paris");

        ProducerRecord<Object, Object> record = new ProducerRecord<>(topicName, key, avroRecord);
        try {
            producer.send(record);
            System.out.println("Message produced");
            System.out.println(record);
        } catch(Exception e) {
            System.out.printf("Error producing message: %s \n", e.getMessage());
        }
        finally {
            producer.flush();
            producer.close();
        }
    }
}