package com.cinnober.rtc.sdk.service.publishing;
/**
* Publishes objects to external streams for example Kafka.
*/
public interface ExternalPublisher {
/**
* Publishes the given object to external streams.
* @param object The object to publish.
* @throws IOException if the publication fails.
*/
void publish(MsgObject object) throws IOException;
}
KafkaPublisher.java
class KafkaPublisher implements ExternalPublisher {
...
@Override
public void publish(MsgObject object) throws IOException {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
jsonCodec.encode(object, outputStream);
final String json = outputStream.toString("UTF-8");
final String topic = config.getTopic(object.getClass());
final ProducerRecord<String, String> record = new ProducerRecord<>(topic, json);
kafkaProducer.send(record);
}
}
FwConfiguration.xml
<FunctionalPartition id="PM" encoding="10" wrappedEncoding="67">
...
<KafkaPublisherConfig enabled="false" connections="localhost:9092">
<topic type="PositionEvent" topic="positions"/>
<topic type="TradeEvent" topic="trades"/>
</KafkaPublisherConfig>
...
</FunctionalPartition>
KafkaPublisherModule.java
@Provides
@Singleton
private KafkaProducer<String, String> provideKafkaProducer() throws UnknownHostException {
final Properties props = new Properties();
props.put("client.id", InetAddress.getLocalHost().getHostName());
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.connections);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(props);
}
KafkaConsumer<String,String> consumer = new KafkaConsumer<>()
while(true) {
ConsumerRecords<String, String> records =
positionConsumer.poll(Duration.ofMillis(200));
for (ConsumerRecord<String, String> record: records) {
byte[] bytes = record.value().getBytes();
Object object = JSON_CODEC
.decode(new ByteArrayInputStream(bytes));
if (object instanceof PositionEvent) {
// Handle PositionEvent
}
}
}