Appearance
Ejemplos de conexión a Kafka
Introducción
Para usar otras tecnologías consulta la documentación de Confluent.
Node.js
Aquí puedes descargar el proyecto completo.
js
const { Kafka } = require("kafkajs");
const {
SchemaRegistry,
SchemaType,
} = require("@kafkajs/confluent-schema-registry");
// Esquema Avro, para serialización y deserialización de mensajes.
// Indica el formato del mensaje.
const schema = `
{
"type": "record",
"name": "RandomTest",
"namespace": "examples",
"fields": [{ "type": "string", "name": "fullName" }]
}
`;
// Credenciales de ejemplo.
// Cámbialas por el usuario y contraseña que te ha proporcionado Soporte.
const username = "username";
const password = "password";
const topic = "test-topic";
// Conexión al registro de esquemas. Se utiliza para guardar
// y recuperar todos los esquemas generados.
// La conexión está securizada. Utiliza las credenciales proporcionadas.
const registry = new SchemaRegistry({
host: "https://schema-registry-security.nautilus.app",
auth: {
username,
password,
},
});
// Conexión con el Kafka del Datahub.
// Es obligatorio especificar los brokers con su puerto,
// protocolo SSL con el certificado previamente descargado y
// SASL con las credenciales proporcionadas.
const kafka = new Kafka({
// Urls para los servidores de Kafka.
brokers: [
"public-broker1.nautilus.app:8096",
"public-broker2.nautilus.app:8096",
"public-broker3.nautilus.app:8096",
"public-broker4.nautilus.app:8096",
"public-broker5.nautilus.app:8096",
],
// Llamadas securizadas. Es obligatorio usar el certificado previamente descargado.
ssl: {
rejectUnauthorized: true
},
// Autenticación para Kafka, con el método de hash SHA512.
sasl: {
mechanism: "scram-sha-512",
username,
password,
},
});
// Código de ejecución principal.
(async () => {
// Se registra el esquema Avro para poder producir mensajes serializados.
// Este paso no es necesario si solo queremos consumir mensajes.
const { id: schemaId } = await registry.register(
{
type: SchemaType.AVRO,
schema
},
{ subject: topic }
);
// Se crean los clientes para Kafka, un productor y un consumidor.
const producer = kafka.producer();
// Para consumir es recomendable crear un grupo de consumidores con un identificador único.
const consumer = kafka.consumer({
groupId: "f3dd7f9f-1b3c-4e47-a96c-f8018a45d175",
});
// En este ejemplo solo se produce un mensaje y después cerramos la conexión
// ya que no produciremos ningún mensaje más.
// El topic que se usa es un nombre de prueba.
await producer.connect();
await producer.send({
topic,
messages: [
// Los datos del mensaje se serializan mediante el esquema generado anteriormente.
{
value: await registry.encode(schemaId, {
type: "Whatever",
fullName: "Just a Name",
}),
},
],
});
await producer.disconnect();
// El consumidor se conecta y empieza a consumir del topic desde el inicio.
// Consumirá todos los mensajes que sean producidos en el topic.
await consumer.connect();
await consumer.subscribe({ topic, fromBeginning: true });
await consumer.run({
// Función que se ejecuta por cada mensaje que es leido por el consumidor.
eachMessage: async ({ topic, partition, message }) =>
console.log(topic, partition, {
// Deserializamos el mensaje para obtener su valor real.
value: await registry.decode(message.value),
}),
});
})();
const { Kafka } = require("kafkajs");
const {
SchemaRegistry,
SchemaType,
} = require("@kafkajs/confluent-schema-registry");
// Esquema Avro, para serialización y deserialización de mensajes.
// Indica el formato del mensaje.
const schema = `
{
"type": "record",
"name": "RandomTest",
"namespace": "examples",
"fields": [{ "type": "string", "name": "fullName" }]
}
`;
// Credenciales de ejemplo.
// Cámbialas por el usuario y contraseña que te ha proporcionado Soporte.
const username = "username";
const password = "password";
const topic = "test-topic";
// Conexión al registro de esquemas. Se utiliza para guardar
// y recuperar todos los esquemas generados.
// La conexión está securizada. Utiliza las credenciales proporcionadas.
const registry = new SchemaRegistry({
host: "https://schema-registry-security.nautilus.app",
auth: {
username,
password,
},
});
// Conexión con el Kafka del Datahub.
// Es obligatorio especificar los brokers con su puerto,
// protocolo SSL con el certificado previamente descargado y
// SASL con las credenciales proporcionadas.
const kafka = new Kafka({
// Urls para los servidores de Kafka.
brokers: [
"public-broker1.nautilus.app:8096",
"public-broker2.nautilus.app:8096",
"public-broker3.nautilus.app:8096",
"public-broker4.nautilus.app:8096",
"public-broker5.nautilus.app:8096",
],
// Llamadas securizadas. Es obligatorio usar el certificado previamente descargado.
ssl: {
rejectUnauthorized: true
},
// Autenticación para Kafka, con el método de hash SHA512.
sasl: {
mechanism: "scram-sha-512",
username,
password,
},
});
// Código de ejecución principal.
(async () => {
// Se registra el esquema Avro para poder producir mensajes serializados.
// Este paso no es necesario si solo queremos consumir mensajes.
const { id: schemaId } = await registry.register(
{
type: SchemaType.AVRO,
schema
},
{ subject: topic }
);
// Se crean los clientes para Kafka, un productor y un consumidor.
const producer = kafka.producer();
// Para consumir es recomendable crear un grupo de consumidores con un identificador único.
const consumer = kafka.consumer({
groupId: "f3dd7f9f-1b3c-4e47-a96c-f8018a45d175",
});
// En este ejemplo solo se produce un mensaje y después cerramos la conexión
// ya que no produciremos ningún mensaje más.
// El topic que se usa es un nombre de prueba.
await producer.connect();
await producer.send({
topic,
messages: [
// Los datos del mensaje se serializan mediante el esquema generado anteriormente.
{
value: await registry.encode(schemaId, {
type: "Whatever",
fullName: "Just a Name",
}),
},
],
});
await producer.disconnect();
// El consumidor se conecta y empieza a consumir del topic desde el inicio.
// Consumirá todos los mensajes que sean producidos en el topic.
await consumer.connect();
await consumer.subscribe({ topic, fromBeginning: true });
await consumer.run({
// Función que se ejecuta por cada mensaje que es leido por el consumidor.
eachMessage: async ({ topic, partition, message }) =>
console.log(topic, partition, {
// Deserializamos el mensaje para obtener su valor real.
value: await registry.decode(message.value),
}),
});
})();
Python
Aquí puedes descargar el proyecto completo.
Consumidor
py
from confluent_kafka import DeserializingConsumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import StringDeserializer
def set_schema_subject(serializationContext, str):
return 'test-topic'
# Credenciales de ejemplo.
# Cámbialas por el usuario y contraseña que te ha proporcionado Soporte.
username = "username"
password = "password"
# Conexión al registro de esquemas. Se utiliza para guardar y recuperar todos los esquemas generados.
# La conexión está securizada. Utiliza las credenciales proporcionadas.
schema_registry_url = 'https://schema-registry-security.pre.nautilus.app'
schema_registry_client = SchemaRegistryClient({'url': schema_registry_url,
'basic.auth.user.info': f'{username}:{password}'})
avro_deserializer = AvroDeserializer(schema_registry_client, {'subject.name.strategy': set_schema_subject})
string_deserializer = StringDeserializer('utf_8')
brokers = 'public-broker1.nautilus.app:8096,public-broker2.nautilus.app:8096,public-broker3.nautilus.app:8096'
# Para consumir es recomendable crear un grupo de consumidores con un identificador único.
groupId = 'test_902c4be8-27fa-4f2d-9b13-e7d0l13f2a3a'
# El nombre del topic del que vamos a consumir, su pueden indicar varios dentro de la array
topics = ['topic.name']
consumer_conf = {
'bootstrap.servers': brokers,
'group.id': groupId,
'auto.offset.reset': 'earliest',
'sasl.username': username,
'sasl.password': password,
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'SCRAM-SHA-512',
'key.deserializer': string_deserializer,
'value.deserializer': avro_deserializer
}
consumer = DeserializingConsumer(consumer_conf)
consumer.subscribe(topics)
while True:
try:
msg = consumer.poll(120)
if msg is None:
continue
if msg.value() is not None:
print(msg.key(), msg.value())
except KeyboardInterrupt:
break
consumer.close()
from confluent_kafka import DeserializingConsumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import StringDeserializer
def set_schema_subject(serializationContext, str):
return 'test-topic'
# Credenciales de ejemplo.
# Cámbialas por el usuario y contraseña que te ha proporcionado Soporte.
username = "username"
password = "password"
# Conexión al registro de esquemas. Se utiliza para guardar y recuperar todos los esquemas generados.
# La conexión está securizada. Utiliza las credenciales proporcionadas.
schema_registry_url = 'https://schema-registry-security.pre.nautilus.app'
schema_registry_client = SchemaRegistryClient({'url': schema_registry_url,
'basic.auth.user.info': f'{username}:{password}'})
avro_deserializer = AvroDeserializer(schema_registry_client, {'subject.name.strategy': set_schema_subject})
string_deserializer = StringDeserializer('utf_8')
brokers = 'public-broker1.nautilus.app:8096,public-broker2.nautilus.app:8096,public-broker3.nautilus.app:8096'
# Para consumir es recomendable crear un grupo de consumidores con un identificador único.
groupId = 'test_902c4be8-27fa-4f2d-9b13-e7d0l13f2a3a'
# El nombre del topic del que vamos a consumir, su pueden indicar varios dentro de la array
topics = ['topic.name']
consumer_conf = {
'bootstrap.servers': brokers,
'group.id': groupId,
'auto.offset.reset': 'earliest',
'sasl.username': username,
'sasl.password': password,
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'SCRAM-SHA-512',
'key.deserializer': string_deserializer,
'value.deserializer': avro_deserializer
}
consumer = DeserializingConsumer(consumer_conf)
consumer.subscribe(topics)
while True:
try:
msg = consumer.poll(120)
if msg is None:
continue
if msg.value() is not None:
print(msg.key(), msg.value())
except KeyboardInterrupt:
break
consumer.close()
Productor
py
from uuid import uuid4
from confluent_kafka import SerializingProducer
from confluent_kafka.serialization import StringSerializer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
class Example(object):
def __init__(self, name, age, surname):
self.name = name
self.age = age
self.surname = surname
def user_to_dict(user, ctx):
return dict(name=user.name,
age=user.age,
surname=user.surname)
def delivery_report(err, msg):
if err is not None:
print("Delivery failed for User record {}: {}".format(msg.key(), err))
return
print('User record {} successfully produced to {} [{}] at offset {}'.format(
msg.key(), msg.topic(), msg.partition(), msg.offset()))
def set_schema_subject(serializationContext, str):
return 'test-topic'
def main():
topic_to_produce = 'test-topic'
# Esquema Avro, para serialización y deserialización de mensajes.
# Indica el formato del mensaje.
schema_str = """
{
"namespace": "example.nautilus.app",
"name": "Example",
"type": "record",
"fields": [
{"name": "name", "type": "string", "default": "name"},
{"name": "age", "type": "int", "default": 10},
{"name": "surname", "type": "string", "default": "surname"}
]
}
"""
# Credenciales de ejemplo.
# Cámbialas por el usuario y contraseña que te ha proporcionado Soporte.
username = "username"
password = "password"
# Conexión al registro de esquemas. Se utiliza para guardar y recuperar todos los esquemas generados.
# La conexión está securizada. Utiliza las credenciales proporcionadas.
schema_registry_url = 'https://schema-registry-security.nautilus.app'
schema_registry_client = SchemaRegistryClient({'url': schema_registry_url,
'basic.auth.user.info': f'{username}:{password}'})
avro_serializer = AvroSerializer(schema_registry_client, schema_str,
user_to_dict, {'subject.name.strategy': set_schema_subject})
brokers = 'public-broker1.nautilus.app:8096,public-broker2.nautilus.app:8096,public-broker3.nautilus.app:8096'
producer_conf = {
'bootstrap.servers': brokers,
'sasl.username': username,
'sasl.password': password,
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'SCRAM-SHA-512',
'key.serializer': StringSerializer('utf_8'),
'value.serializer': avro_serializer
}
producer = SerializingProducer(producer_conf)
value_to_produce = Example(name="Example", age=24, surname="Surname")
number_of_messages = 3
while number_of_messages > 0:
producer.poll(0.0)
try:
producer.produce(topic=topic_to_produce, key=str(uuid4()), value=value_to_produce,
on_delivery=delivery_report)
number_of_messages -= 1
except KeyboardInterrupt:
break
except ValueError:
print("Invalid input, discarding record...")
continue
print("\nFlushing records...")
producer.flush()
if __name__ == '__main__':
main()
from uuid import uuid4
from confluent_kafka import SerializingProducer
from confluent_kafka.serialization import StringSerializer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
class Example(object):
def __init__(self, name, age, surname):
self.name = name
self.age = age
self.surname = surname
def user_to_dict(user, ctx):
return dict(name=user.name,
age=user.age,
surname=user.surname)
def delivery_report(err, msg):
if err is not None:
print("Delivery failed for User record {}: {}".format(msg.key(), err))
return
print('User record {} successfully produced to {} [{}] at offset {}'.format(
msg.key(), msg.topic(), msg.partition(), msg.offset()))
def set_schema_subject(serializationContext, str):
return 'test-topic'
def main():
topic_to_produce = 'test-topic'
# Esquema Avro, para serialización y deserialización de mensajes.
# Indica el formato del mensaje.
schema_str = """
{
"namespace": "example.nautilus.app",
"name": "Example",
"type": "record",
"fields": [
{"name": "name", "type": "string", "default": "name"},
{"name": "age", "type": "int", "default": 10},
{"name": "surname", "type": "string", "default": "surname"}
]
}
"""
# Credenciales de ejemplo.
# Cámbialas por el usuario y contraseña que te ha proporcionado Soporte.
username = "username"
password = "password"
# Conexión al registro de esquemas. Se utiliza para guardar y recuperar todos los esquemas generados.
# La conexión está securizada. Utiliza las credenciales proporcionadas.
schema_registry_url = 'https://schema-registry-security.nautilus.app'
schema_registry_client = SchemaRegistryClient({'url': schema_registry_url,
'basic.auth.user.info': f'{username}:{password}'})
avro_serializer = AvroSerializer(schema_registry_client, schema_str,
user_to_dict, {'subject.name.strategy': set_schema_subject})
brokers = 'public-broker1.nautilus.app:8096,public-broker2.nautilus.app:8096,public-broker3.nautilus.app:8096'
producer_conf = {
'bootstrap.servers': brokers,
'sasl.username': username,
'sasl.password': password,
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'SCRAM-SHA-512',
'key.serializer': StringSerializer('utf_8'),
'value.serializer': avro_serializer
}
producer = SerializingProducer(producer_conf)
value_to_produce = Example(name="Example", age=24, surname="Surname")
number_of_messages = 3
while number_of_messages > 0:
producer.poll(0.0)
try:
producer.produce(topic=topic_to_produce, key=str(uuid4()), value=value_to_produce,
on_delivery=delivery_report)
number_of_messages -= 1
except KeyboardInterrupt:
break
except ValueError:
print("Invalid input, discarding record...")
continue
print("\nFlushing records...")
producer.flush()
if __name__ == '__main__':
main()
Java
Aquí puedes descargar el proyecto completo.
Consumidor
java
package kafka.example;
import kafka.example.config.SoftKafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class Consumer {
public static void startConsumer() {
Properties properties = new Properties();
// Credenciales de ejemplo.
// Cámbialas por el usuario y contraseña que te ha proporcionado Soporte.
String groupId = "test_change_me1";
String username = "username";
String password = "password";
String topicName = "test-topic";
String brokers = "public-broker1.nautilus.app:8096,public-broker2.nautilus.app:8096,public-broker3.nautilus.app:8096";
properties.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(Utils.SASL_CREDENTIALS, username, password));
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
properties.put("basic.auth.credentials.source", "USER_INFO");
properties.put("schema.registry.basic.auth.user.info", username + ":" + password);
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
properties.put("group.id", groupId);
properties.put("auto.commit.enable", "false");
properties.put("auto.offset.reset", "earliest");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "20000");
// avro part (deserializer)
properties.setProperty("key.deserializer", StringDeserializer.class.getName());
properties.setProperty("value.deserializer", SoftKafkaAvroDeserializer.class.getName());
properties.setProperty("schema.registry.url", "https://schema-registry-security.dev.nautilus.app");
properties.setProperty("specific.avro.reader", "false");
KafkaConsumer<String, GenericRecord> kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(Collections.singleton(topicName));
System.out.println("Waiting for data on topic: " + topicName + "...");
try{
while (true){
System.out.println("Polling");
ConsumerRecords<String, GenericRecord> records = kafkaConsumer.poll(Duration.ofMillis(20000));
for (ConsumerRecord<String, GenericRecord> record : records){
System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
}
kafkaConsumer.commitSync();
}
}catch (Exception e){
System.out.printf("Error consuming message: %s \n", e.getMessage());
}finally {
kafkaConsumer.close();
}
}
}
package kafka.example;
import kafka.example.config.SoftKafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class Consumer {
public static void startConsumer() {
Properties properties = new Properties();
// Credenciales de ejemplo.
// Cámbialas por el usuario y contraseña que te ha proporcionado Soporte.
String groupId = "test_change_me1";
String username = "username";
String password = "password";
String topicName = "test-topic";
String brokers = "public-broker1.nautilus.app:8096,public-broker2.nautilus.app:8096,public-broker3.nautilus.app:8096";
properties.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(Utils.SASL_CREDENTIALS, username, password));
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
properties.put("basic.auth.credentials.source", "USER_INFO");
properties.put("schema.registry.basic.auth.user.info", username + ":" + password);
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
properties.put("group.id", groupId);
properties.put("auto.commit.enable", "false");
properties.put("auto.offset.reset", "earliest");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "20000");
// avro part (deserializer)
properties.setProperty("key.deserializer", StringDeserializer.class.getName());
properties.setProperty("value.deserializer", SoftKafkaAvroDeserializer.class.getName());
properties.setProperty("schema.registry.url", "https://schema-registry-security.dev.nautilus.app");
properties.setProperty("specific.avro.reader", "false");
KafkaConsumer<String, GenericRecord> kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(Collections.singleton(topicName));
System.out.println("Waiting for data on topic: " + topicName + "...");
try{
while (true){
System.out.println("Polling");
ConsumerRecords<String, GenericRecord> records = kafkaConsumer.poll(Duration.ofMillis(20000));
for (ConsumerRecord<String, GenericRecord> record : records){
System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
}
kafkaConsumer.commitSync();
}
}catch (Exception e){
System.out.printf("Error consuming message: %s \n", e.getMessage());
}finally {
kafkaConsumer.close();
}
}
}
Productor
java
package kafka.example;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import kafka.example.config.SoftKafkaAvroSerializer;
import java.util.Properties;
public class Producer {
public static void startProducer() {
Properties props = new Properties();
String username = "username";
String password = "password";
String topicName = "test-topic";
String brokers = "public-broker1.nautilus.app:8096,public-broker2.nautilus.app:8096,public-broker3.nautilus.app:8096";
props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(Utils.SASL_CREDENTIALS, username, password));
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SoftKafkaAvroSerializer.class.getName());
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
props.put("schema.registry.url", "https://schema-registry-security.dev.nautilus.app");
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("schema.registry.basic.auth.user.info", username + ":" + password);
KafkaProducer producer = new KafkaProducer(props);
String key = null; // Si no es necesrio se puede poner a null
// Esquema Avro, para serialización y deserialización de mensajes.
// Indica el formato del mensaje, esta en formato json escape
String userSchema = "{\"type\":\"record\",\"name\":\"test\",\"namespace\":\"nautilus.test\",\"doc\":\"Test event schema\",\"fields\":[{\"name\":\"country\",\"type\":\"string\"},{\"name\":\"city\",\"type\":[\"null\",\"string\"],\"default\":null}]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("country", "France");
avroRecord.put("city", "Paris");
ProducerRecord<Object, Object> record = new ProducerRecord<>(topicName, key, avroRecord);
try {
producer.send(record);
System.out.println("Message produced");
System.out.println(record);
} catch(Exception e) {
System.out.printf("Error producing message: %s \n", e.getMessage());
}
finally {
producer.flush();
producer.close();
}
}
}
package kafka.example;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import kafka.example.config.SoftKafkaAvroSerializer;
import java.util.Properties;
public class Producer {
public static void startProducer() {
Properties props = new Properties();
String username = "username";
String password = "password";
String topicName = "test-topic";
String brokers = "public-broker1.nautilus.app:8096,public-broker2.nautilus.app:8096,public-broker3.nautilus.app:8096";
props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(Utils.SASL_CREDENTIALS, username, password));
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SoftKafkaAvroSerializer.class.getName());
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
props.put("schema.registry.url", "https://schema-registry-security.dev.nautilus.app");
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("schema.registry.basic.auth.user.info", username + ":" + password);
KafkaProducer producer = new KafkaProducer(props);
String key = null; // Si no es necesrio se puede poner a null
// Esquema Avro, para serialización y deserialización de mensajes.
// Indica el formato del mensaje, esta en formato json escape
String userSchema = "{\"type\":\"record\",\"name\":\"test\",\"namespace\":\"nautilus.test\",\"doc\":\"Test event schema\",\"fields\":[{\"name\":\"country\",\"type\":\"string\"},{\"name\":\"city\",\"type\":[\"null\",\"string\"],\"default\":null}]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("country", "France");
avroRecord.put("city", "Paris");
ProducerRecord<Object, Object> record = new ProducerRecord<>(topicName, key, avroRecord);
try {
producer.send(record);
System.out.println("Message produced");
System.out.println(record);
} catch(Exception e) {
System.out.printf("Error producing message: %s \n", e.getMessage());
}
finally {
producer.flush();
producer.close();
}
}
}