在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
开源软件名称(OpenSource Name):turtlesoupy/haskakafka开源软件地址(OpenSource Url):https://github.com/turtlesoupy/haskakafka开源编程语言(OpenSource Language):Haskell 100.0%开源软件介绍(OpenSource Introduction):HaskakafkaKafka bindings for Haskell backed by the librdkafka C module. It has been tested and fully supports Kafka 0.9.0.1 using librdkafka 0.9.0.99 and higher on Linux and OS X. Haskakafka supports both producers and consumers with optional batch operations. Hackage: http://hackage.haskell.org/package/haskakafka UsageA quick walkthrough of producers and consumers: import Haskakafka
import qualified Data.ByteString.Char8 as C8
example :: IO ()
example = do
let
-- Optionally, we can configure certain parameters for Kafka
kafkaConfig = [("socket.timeout.ms", "50000")]
topicConfig = [("request.timeout.ms", "50000")]
-- Payloads are just ByteStrings
samplePayload = C8.pack "Hello world"
-- withKafkaProducer opens a producer connection and gives us
-- two objects for subsequent use.
withKafkaProducer kafkaConfig topicConfig
"localhost:9092" "test_topic"
$ \kafka topic -> do
-- Produce a single unkeyed message to partition 0
let message = KafkaProduceMessage samplePayload
_ <- produceMessage topic (KafkaSpecifiedPartition 0) message
-- Produce a single keyed message
let keyMessage = KafkaProduceKeyedMessage (C8.pack "Key") samplePayload
_ <- produceKeyedMessage topic keyMessage
-- We can also use the batch API for better performance
_ <- produceMessageBatch topic KafkaUnassignedPartition [message, keyMessage]
putStrLn "Done producing messages, here was our config: "
dumpConfFromKafka kafka >>= \d -> putStrLn $ "Kafka config: " ++ (show d)
dumpConfFromKafkaTopic topic >>= \d -> putStrLn $ "Topic config: " ++ (show d)
-- withKafkaConsumer opens a consumer connection and starts consuming
let partition = 0
withKafkaConsumer kafkaConfig topicConfig
"localhost:9092" "test_topic"
partition -- locked to a specific partition for each consumer
KafkaOffsetBeginning -- start reading from beginning (alternatively, use
-- KafkaOffsetEnd, KafkaOffset or KafkaOffsetStored)
$ \kafka topic -> do
-- Consume a single message at a time
let timeoutMs = 1000
me <- consumeMessage topic partition timeoutMs
case me of
(Left err) -> putStrLn $ "Uh oh, an error! " ++ (show err)
(Right m) -> putStrLn $ "Woo, payload was " ++ (C8.unpack $ messagePayload m)
-- For better performance, consume in batches
let maxMessages = 10
mes <- consumeMessageBatch topic partition timeoutMs maxMessages
case mes of
(Left err) -> putStrLn $ "Something went wrong in batch consume! " ++ (show err)
(Right ms) -> putStrLn $ "Woohoo, we got " ++ (show $ length ms) ++ " messages"
-- Be a little less noisy
setLogLevel kafka KafkaLogCrit
-- we can also fetch metadata about our Kafka infrastructure
let timeoutMs = 1000
emd <- fetchBrokerMetadata [] "localhost:9092" timeoutMs
case emd of
(Left err) -> putStrLn $ "Uh oh, error time: " ++ (show err)
(Right md) -> putStrLn $ "Kafka metadata: " ++ (show md) Configuration OptionsConfiguration options are set in the call to High Level ConsumersHigh level consumers are supported by librdkafka starting from version 0.9. High-level consumers have the ability to handle more than one partition and even more than one topic. Scalability and rebalancing are taken care of by librdkafka: once a new consumer in the same consumer group is started the rebalance happens and all consumer share the load. This version of Haskakafka adds (experimental) support for high-level consumers, here is how such a consumer can be used in code: import Haskakafka
import Haskakafka.Consumer
runConsumerExample :: IO ()
runConsumerExample = do
res <- runConsumer
(ConsumerGroupId "test_group") -- group id is required
[] -- extra kafka conf properties
(BrokersString "localhost:9092") -- kafka brokers to connect to
[TopicName "^hl-test*"] -- list of topics to consume, supporting regex
processMessages -- handler to consume messages
print $ show res
-- this function is used inside consumer
-- and it is responsible for polling and handling messages
-- In this case I will do 10 polls and then return a success
processMessages :: Kafka -> IO (Either KafkaError ())
processMessages kafka = do
mapM_ (\_ -> do
msg1 <- pollMessage kafka 1000
print $ show msg1) [1..10]
return $ Right ()
InstallationInstalling librdkafkaAlthough librdkafka is available on many platforms, most of the distribution packages are too old to support haskakafka. As such, we suggest you install from the source:
If the C++ bindings fail for you, just install the C bindings alone.
On Debian and OS X, this will install the shared and static libraries to Installing KafkaThe full Kafka guide is at http://kafka.apache.org/documentation.html#quickstart Installing HaskakafkaIf you want to use cabal—since haskakafka uses
Afterwards installation should work, so go for
This uses the latest version of Haskakafka from Hackage. TestingHaskakafka ships with a suite of integration tests to verify the library against
a live Kafka instance. To get these setup you must have a broker running
on To get a broker running, download a Kafka distribution and untar it into a directory. From there, run zookeeper using
and run kafka in a separate window using
With both Kafka and Zookeeper running, you can run tests through stack:
You can also run tests through cabal:
Running Examples
The following will produce 11 messages on partition 5 for topic
The following will consume 11 messages on partition 5 for topic
The following will pretty print a list of all brokers and topics:
|
2023-10-27
2022-08-15
2022-08-17
2022-09-23
2022-08-13
请发表评论