kafka

kafka utilities
0.6.5 released
maiha/kafka.cr
34 2 1
maiha

kafka.cr Build Status

kafka library and utils for Crystal.

binary (standalone utilities)
  • x86_64 binary: https://github.com/maiha/kafka.cr/releases
crystal versions
  • 0.24.1 : :white_check_mark: use 0.6.5 or higher
  • 0.23.x : :white_check_mark: use 0.6.5 or higher
  • 0.22.0 : :warning: available in crystal-0.22.0 tag, but no more maintenance guaranteed

Example

require "kafka"

kafka = Kafka.new
kafka.topics.map(&.name)  # => ["t1", ...]
kafka.produce "t1", "foo"
kafka.fetch "t1"          # => Kafka::Message("t1#0:0", "foo")
kafka.close

components

  • bin: standalone kafka utility applications (x86 static binary)
  • lib: as crystal library

lib

supported protocols

  • https://github.com/maiha/kafka.cr/blob/master/src/kafka/protocol.cr

Installation

Add it to shard.yml

dependencies:
  kafka:
    github: maiha/kafka.cr
    version: 0.6.5
require "kafka"

kafka = Kafka.new("localhost", 9092)

kafka.topics.map(&.name)  # => ["t1", ...]
kafka.produce("t1", "test")
kafka.fetch("t1", 0, 0_i64)  # => Kafka::Message("t1[0]#0", "test")

kafka.close

bin

build

  • first type crystal deps to download related library
  • type make that generates bin/kafka-*
  • Env CRYSTAL is used for the replacement of crystal command
% make
% make CRYSTAL=/your/customized-crystal/bin/crystal

created binaries (for utils)

  • kafka-broker : Show broker information. "-j" causes json output.
  • kafka-cluster-watch : Report cluster information continually.
  • kafka-error : Lookup kafka error code.
  • kafka-fetch : Fetch logs from kafka. "-g" tries to resolve payload.
  • kafka-info : Show topic information about offsets. (need only a broker)
  • kafka-ping : Ping to a broker like unix ping.
  • kafka-topics : Show topic information about leader, replicas, isrs. (need exact leaders)

created binaries (for kafka protocols study)

  • kafka-heartbeat : Send heartbeat request(api:12). [experimental]
  • kafka-metadata : Send metadata request(api:3).
  • kafka-offset : Send offset request(api:2).

kafka-info

% ./bin/kafka-info t1 t2
t2#0     count=18 [37, 36, 19]
t1#2     count=1 [1, 0]
t1#0     count=1 [1, 0]
t1#1     count=0 [0]
  • count messages in all topics
% ./bin/kafka-info -c -a
2       a
0       b

kafka-topics

  • bin/kafka-topics shows topic names and metadatas
% ./bin/kafka-topics
t1
tmp

% ./bin/kafka-topics -c | sort -n
0       t1
6       tmp

% ./bin/kafka-topics t1 t2
t1(0 => {leader=1,replica=[1],isr=[1]})
ERROR: t2(UnknownTopicOrPartitionCode (3))

kafka-ping

  • bin/kafka-ping works like unix ping command
% ./bin/kafka-ping localhost
Kafka PING localhost:9092 (by HeartbeatRequest)
[2016-01-28 00:27:30 +0000] errno=16 from localhost:9092 req_seq=1 time=7.354 ms
[2016-01-28 00:27:31 +0000] errno=16 from localhost:9092 req_seq=2 time=3.433 ms
^C
--- localhost:9092 kafka ping statistics ---
2 requests transmitted, 2 received, ok: 2, error: 0
  • -g option can be used for checking version
% ./bin/kafka-ping localhost -g
Kafka PING localhost:9092 (by HeartbeatRequest)
[2016-01-28 00:29:16 +0000] (0.8.x) from localhost:9092 req_seq=1 time=8.459 ms
...
  • write reports about changing state into stderr
% ./bin/kafka-ping localhost -g
(stdout)
[2016-01-28 00:30:32 +0000] (0.8.x) from localhost:9092 req_seq=76 time=3.194 ms
[2016-01-28 00:30:33 +0000] (0.8.x) from localhost:9092 req_seq=77 time=3.122 ms
[2016-01-28 00:30:34 +0000] (broker is down) from localhost:9092 req_seq= time=0.511 ms
(stderr)
[2016-01-28 00:30:34 +0000] localhost:9092 : (0.8.x) -> (broker is down)

Development

crystal deps
make

Unit Test

  • make test

CI : test with real brokers

ci-setup prepares zk and kafka brokers by docker-compose.

  • https://github.com/wurstmeister/kafka-docker/blob/master/docker-compose-single-broker.yml
make ci-setup    # starts kafka with `ci/docker-compose.yml`
make ci
make ci-teardown # stop above kafka brokers

Contributing

  1. Fork it ( https://github.com/maiha/kafka.cr/fork )
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create a new Pull Request

Contributors

  • maiha maiha - creator, maintainer
kafka:
  github: maiha/kafka.cr
  version: ~> 0.6.5
License MIT
Crystal none

Authors

Dependencies 1

  • msgpack
    {'github' => 'benoist/msgpack-crystal'}

Development Dependencies 1

  • spec2~maiha crystal-0.24
    {'branch' => 'crystal-0.24', 'github' => 'maiha/spec2.cr'}

Dependents 0

Last synced .
search fire star recently