Kafka

A third party data stream

What is it?

The glue between data
producers and consumers

Inside the cluster

An append only log

Partitioning

Consumer groups

Buzzwords

  • Broker
  • Producer
  • Consumers
  • Topic
  • Stream
  • Facts/Events

The PoC

  • Publish facts from RTC
  • Consume facts outside of RTC
  • Create Auto tests for data publishing
  • Create scripts for setting up a Kafka cluster
  • Try to integrate this into framework

Publishing


                            package com.cinnober.rtc.sdk.service.publishing;

                            /**
                             * Publishes objects to external streams for example Kafka.
                             */
                            public interface ExternalPublisher {
                                /**
                                 * Publishes the given object to external streams.
                                 * @param object The object to publish.
                                 * @throws IOException if the publication fails.
                                 */
                                void publish(MsgObject object) throws IOException;
                            }
                        

KafkaPublisher.java


                            class KafkaPublisher implements ExternalPublisher {
                                ...
                                @Override
                                public void publish(MsgObject object) throws IOException {
                                    final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
                                    jsonCodec.encode(object, outputStream);
                                    final String json = outputStream.toString("UTF-8");
                                    final String topic = config.getTopic(object.getClass());
                                    final ProducerRecord<String, String> record = new ProducerRecord<>(topic, json);
                                    kafkaProducer.send(record);
                                }
                            }
                        

FwConfiguration.xml


                            <FunctionalPartition id="PM" encoding="10" wrappedEncoding="67">
                                ...
                                <KafkaPublisherConfig enabled="false" connections="localhost:9092">
                                    <topic type="PositionEvent"  topic="positions"/>
                                    <topic type="TradeEvent"     topic="trades"/>
                                </KafkaPublisherConfig>
                                ...
                            </FunctionalPartition>
                        

KafkaPublisherModule.java


                            @Provides
                            @Singleton
                            private KafkaProducer<String, String> provideKafkaProducer() throws UnknownHostException {
                                final Properties props = new Properties();
                                props.put("client.id", InetAddress.getLocalHost().getHostName());
                                props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.connections);
                                props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                                props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                                return new KafkaProducer<>(props);
                            }
                        

Consuming


                            KafkaConsumer<String,String> consumer = new KafkaConsumer<>()
                            while(true) {
                                ConsumerRecords<String, String> records =
                                        positionConsumer.poll(Duration.ofMillis(200));
                                for (ConsumerRecord<String, String> record: records) {
                                    byte[] bytes = record.value().getBytes();
                                    Object object = JSON_CODEC
                                            .decode(new ByteArrayInputStream(bytes));
                                    if (object instanceof PositionEvent) {
                                        // Handle PositionEvent
                                    }
                                }
                            }
                        

Cluster Set-up

  • ./kafka_start.sh
    • Starts ZooKeeper
    • Starts 3 Kafka brokers
  • ./kafka_stop.sh

Configuration and Logging

  • Properties
    • environments/developer/cfg/kafka/broker.properties
    • environments/developer/cfg/kafka/zookeeper.properties
  • Logging
    • environments/developer/deployment/system/kafka/

Problems

  • Cygwin + Kafka = 👎