在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
开源软件名称(OpenSource Name):haskell-works/hw-kafka-client开源软件地址(OpenSource Url):https://github.com/haskell-works/hw-kafka-client开源编程语言(OpenSource Language):Haskell 94.6%开源软件介绍(OpenSource Introduction):hw-kafka-clientKafka bindings for Haskell backed by the librdkafka C module. EcosystemHaskellWorks Kafka ecosystem is described here: https://github.com/haskell-works/hw-kafka ConsumerHigh level consumers are supported by Consumer exampleSee Running integration tests locally to learn how to configure a local environment. cabal build --flag examples or cabal run kafka-client-example --flag examples A working consumer example can be found here: ConsumerExample.hs import Control.Exception (bracket)
import Kafka.Consumer
-- Global consumer properties
consumerProps :: ConsumerProperties
consumerProps = brokersList ["localhost:9092"]
<> groupId "consumer_example_group"
<> noAutoCommit
<> logLevel KafkaLogInfo
-- Subscription to topics
consumerSub :: Subscription
consumerSub = topics ["kafka-client-example-topic"]
<> offsetReset Earliest
-- Running an example
runConsumerExample :: IO ()
runConsumerExample = do
res <- bracket mkConsumer clConsumer runHandler
print res
where
mkConsumer = newConsumer consumerProps consumerSub
clConsumer (Left err) = return (Left err)
clConsumer (Right kc) = maybe (Right ()) Left <$> closeConsumer kc
runHandler (Left err) = return (Left err)
runHandler (Right kc) = processMessages kc
-------------------------------------------------------------------
processMessages :: KafkaConsumer -> IO (Either KafkaError ())
processMessages kafka = do
replicateM_ 10 $ do
msg <- pollMessage kafka (Timeout 1000)
putStrLn $ "Message: " <> show msg
err <- commitAllOffsets OffsetCommit kafka
putStrLn $ "Offsets: " <> maybe "Committed." show err
return $ Right () Producer
A working producer example can be found here: ProducerExample.hs Delivery reportsKafka Producer maintains its own internal queue for outgoing messages. Calling However, it is not always possible for the producer to send messages to Kafka. Network problems or Kafka cluster being offline can prevent the producer from doing it. When a message cannot be sent to Kafka for some time (see It is possible to configure producerProps :: ProducerProperties
producerProps = brokersList ["localhost:9092"]
<> sendTimeout (Timeout 0) -- for librdkafka "0" means "infinite" (see https://github.com/edenhill/librdkafka/issues/2015) Delivery reports provide the way to detect when producer experiences problems sending messages to Kafka. Currently producerProps :: ProducerProperties
producerProps = brokersList ["localhost:9092"]
<> setCallback (deliveryCallback print) In the example above when the producer cannot deliver the message to Kafka, the error will be printed (and the message will be dropped). Producer example{-# LANGUAGE OverloadedStrings #-}
import Control.Exception (bracket)
import Control.Monad (forM_)
import Data.ByteString (ByteString)
import Kafka.Producer
-- Global producer properties
producerProps :: ProducerProperties
producerProps = brokersList ["localhost:9092"]
<> logLevel KafkaLogDebug
-- Topic to send messages to
targetTopic :: TopicName
targetTopic = "kafka-client-example-topic"
-- Run an example
runProducerExample :: IO ()
runProducerExample =
bracket mkProducer clProducer runHandler >>= print
where
mkProducer = newProducer producerProps
clProducer (Left _) = return ()
clProducer (Right prod) = closeProducer prod
runHandler (Left err) = return $ Left err
runHandler (Right prod) = sendMessages prod
sendMessages :: KafkaProducer -> IO (Either KafkaError ())
sendMessages prod = do
err1 <- produceMessage prod (mkMessage Nothing (Just "test from producer") )
forM_ err1 print
err2 <- produceMessage prod (mkMessage (Just "key") (Just "test from producer (with key)"))
forM_ err2 print
return $ Right ()
mkMessage :: Maybe ByteString -> Maybe ByteString -> ProducerRecord
mkMessage k v = ProducerRecord
{ prTopic = targetTopic
, prPartition = UnassignedPartition
, prKey = k
, prValue = v
} Synchronous sending of messagesBecause of the asynchronous nature of librdkafka, there is no API to provide
synchronous production of messages. It is, however, possible to combine the
delivery reports feature with that of callbacks. This can be done using the
produceMessage' :: MonadIO m
=> KafkaProducer
-> ProducerRecord
-> (DeliveryReport -> IO ())
-> m (Either ImmediateError ()) Using this function, you can provide a callback which will be invoked upon the
produced message's delivery report. With a little help of sendMessageSync :: MonadIO m
=> KafkaProducer
-> ProducerRecord
-> m (Either KafkaError Offset)
sendMessageSync producer record = liftIO $ do
-- Create an empty MVar:
var <- newEmptyMVar
-- Produce the message and use the callback to put the delivery report in the
-- MVar:
res <- produceMessage' producer record (putMVar var)
case res of
Left (ImmediateError err) ->
pure (Left err)
Right () -> do
-- Flush producer queue to make sure you don't get stuck waiting for the
-- message to send:
flushProducer producer
-- Wait for the message's delivery report and map accordingly:
takeMVar var >>= return . \case
DeliverySuccess _ offset -> Right offset
DeliveryFailure _ err -> Left err
NoMessageError err -> Left err Note: this is a semi-naive solution as this waits forever (or until librdkafka times out). You should make sure that your configuration reflects the behavior you want out of this functionality. Running integration tests locallyshell.nix can be used to provide a working environment that is enough to build and test To be able to run tests locally,
With Docker Compose this variable is used to configure Kafka broker to listen on this address:
After that, integration tests can switched on with using 'it' flag:
CreditsThis project is inspired by Haskakafka which unfortunately doesn't seem to be actively maintained. |
2023-10-27
2022-08-15
2022-08-17
2022-09-23
2022-08-13
请发表评论