如何保證kafka 的消息機(jī)制 ack-fail 源碼跟蹤 kafka 生產(chǎn)者回調(diào)函數(shù) 為什么沒有異常
Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.(Kafka布式、區(qū)(partitioned)、基于備份(replicated)commit-log存儲(chǔ)服務(wù).提供類似于messaging system特性,設(shè)計(jì)實(shí)現(xiàn)完全同)kafka種高吞吐量布式發(fā)布訂閱消息系統(tǒng)特性:
(1)、通O(1)磁盤數(shù)據(jù)結(jié)構(gòu)提供消息持久化種結(jié)構(gòu)于即使數(shù)TB消息存儲(chǔ)能夠保持間穩(wěn)定性能
(2)、高吞吐量:即使非普通硬件kafka支持每秒數(shù)十萬消息
(3)、支持通kafka服務(wù)器消費(fèi)機(jī)集群區(qū)消息
(4)、支持Hadoop并行數(shù)據(jù)加載
、用Kafka面自帶腳本進(jìn)行編譯
載Kafka源碼面自帶gradlew腳本我利用編譯Kafka源碼:
1 # wget
2 # tar -zxf kafka-0.8.1.1-src.tgz
3 # cd kafka-0.8.1.1-src
4 # ./gradlew releaseTarGz
運(yùn)行面命令進(jìn)行編譯現(xiàn)異信息:
01 :core:signArchives FAILED
02
03 FAILURE: Build failed with an exception.
04
05 * What went wrong:
06 Execution failed for task ':core:signArchives'.
07 > Cannot perform signing task ':core:signArchives' because it
08 has no configured signatory
09
10 * Try:
11 Run with --stacktrace option to get the stack trace. Run with
12 --info or --debug option to get more log output.
13
14 BUILD FAILED
bug()用面命令進(jìn)行編譯
1 ./gradlew releaseTarGzAll -x signArchives
候編譯功(編譯程現(xiàn))編譯程我指定應(yīng)Scala版本進(jìn)行編譯:
1 ./gradlew -PscalaVersion=2.10.3 releaseTarGz -x signArchives
編譯完core/build/distributions/面kafka_2.10-0.8.1.1.tgz文件網(wǎng)載直接用
二、利用sbt進(jìn)行編譯
我同用sbt編譯Kafka步驟:
01 # git clone
02 # cd kafka
03 # git checkout -b 0.8 remotes/origin/0.8
04 # ./sbt update
05 [info] [SUCCESSFUL ] org.eclipse.jdt#core;3.1.1!core.jar (2243ms)
06 [info] downloading ...
07 [info] [SUCCESSFUL ] ant#ant;1.6.5!ant.jar (1150ms)
08 [info] Done updating.
09 [info] Resolving org.apache.hadoop#hadoop-core;0.20.2 ...
10 [info] Done updating.
11 [info] Resolving com.yammer.metrics#metrics-annotation;2.2.0 ...
12 [info] Done updating.
13 [info] Resolving com.yammer.metrics#metrics-annotation;2.2.0 ...
14 [info] Done updating.
15 [success] Total time: 168 s, completed Jun 18, 2014 6:51:38 PM
16
17 # ./sbt package
18 [info] Set current project to Kafka (in build file:/export1/spark/kafka/)
19 Getting Scala 2.8.0 ...
20 :: retrieving :: org.scala-sbt#boot-scala
21 confs: [default]
22 3 artifacts copied, 0 already retrieved (14544kB/27ms)
23 [success] Total time: 1 s, completed Jun 18, 2014 6:52:37 PM
于Kafka 0.8及版本需要運(yùn)行命令:
01 # ./sbt assembly-package-dependency
02 [info] Loading project definition from /export1/spark/kafka/project
03 [warn] Multiple resolvers having different access mechanism configured with
04 same name 'sbt-plugin-releases'. To avoid conflict, Remove duplicate project
05 resolvers (`resolvers`) or rename publishing resolver (`publishTo`).
06 [info] Set current project to Kafka (in build file:/export1/spark/kafka/)
07 [warn] Credentials file /home/wyp/.m2/.credentials does not exist
08 [info] Including slf4j-api-1.7.2.jar
09 [info] Including metrics-annotation-2.2.0.jar
10 [info] Including scala-compiler.jar
11 [info] Including scala-library.jar
12 [info] Including slf4j-simple-1.6.4.jar
13 [info] Including metrics-core-2.2.0.jar
14 [info] Including snappy-java-1.0.4.1.jar
15 [info] Including zookeeper-3.3.4.jar
16 [info] Including log4j-1.2.15.jar
17 [info] Including zkclient-0.3.jar
18 [info] Including jopt-simple-3.2.jar
19 [warn] Merging 'META-INF/NOTICE' with strategy 'rename'
20 [warn] Merging 'org/xerial/snappy/native/README' with strategy 'rename'
21 [warn] Merging 'META-INF/maven/org.xerial.snappy/snappy-java/LICENSE'
22 with strategy 'rename'
23 [warn] Merging 'LICENSE.txt' with strategy 'rename'
24 [warn] Merging 'META-INF/LICENSE' with strategy 'rename'
25 [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard'
26 [warn] Strategy 'discard' was applied to a file
27 [warn] Strategy 'rename' was applied to 5 files
28 [success] Total time: 3 s, completed Jun 18, 2014 6:53:41 PM
我sbt面指定scala版本:
01 <!--
02 User: 往記憶
03 Date: 14-6-18
04 Time: 20:20
05 bolg:
06 本文址:/archives/1044
07 往記憶博客專注于hadoop、hive、spark、shark、flume技術(shù)博客量干貨
08 往記憶博客微信公共帳號:iteblog_hadoop
09 -->
10 sbt "++2.10.3 update"
11 sbt "++2.10.3 package"
12 sbt "++2.10.3 assembly-package-dependency"
聞憑15278873982: 如何為Kafka集群選擇合適的Partitions數(shù)量 -
明水縣凹面: ______ 如何決定kafka集群中topic,partition的數(shù)量,這是許多kafka用戶經(jīng)常遇到的問題.本文列舉闡述幾個(gè)重要的決定因素,以提供一些參考.分區(qū)多吞吐量更高 一個(gè)話題topic的各個(gè)分區(qū)partiton之間是并行的.在producer和broker方面,寫不同的分區(qū)...
聞憑15278873982: kafka實(shí)現(xiàn)的是消息隊(duì)列的什么協(xié)議 -
明水縣凹面: ______ Kafka 分布式消息隊(duì)列 類似產(chǎn)品有JBoss、MQ 一、由Linkedln 開源,使用scala開發(fā),有如下幾個(gè)特點(diǎn): (1)高吞吐 (2)分布式 (3)支持多語言客戶端 (C++、Java) 二、組成: 客戶端是 producer 和 consumer,提供一些API,服務(wù)器端是Broker,...
聞憑15278873982: kafka - console - consumer為什么沒有記錄 -
明水縣凹面: ______ 不過要注意一些注意事項(xiàng),對于多個(gè)partition和多個(gè)consumer 一. 如果consumer比partition多,是浪費(fèi),因?yàn)閗afka的設(shè)計(jì)是在一個(gè)partition上是不允許并發(fā)的,所以consumer數(shù)不要大于partition數(shù) 二. 如果consumer比partition少,一個(gè)consumer...
聞憑15278873982: 如何為Kafka集群選擇合適的Topics/Partitions數(shù)量 -
明水縣凹面: ______ 首先我們需要明白以下事實(shí):在kafka中,單個(gè)patition是kafka并行操作的最小單元.在producer和broker端,向每一個(gè)分區(qū)寫入數(shù)據(jù)是可以完全并行化的,此時(shí),可以通過加大硬件資源的利用率來提升系統(tǒng)的吞吐量,例如對數(shù)據(jù)進(jìn)行壓縮.在...
聞憑15278873982: kafka中的topic為什么要進(jìn)行分區(qū) -
明水縣凹面: ______ 若沒有分區(qū),一個(gè)topic對應(yīng)的消息集在分布式集群服務(wù)組中,就會(huì)分布不均勻,即可能導(dǎo)致某臺(tái)服務(wù)器A記錄當(dāng)前topic的消息集很多,若此topic的消息壓力很大的情況下,服務(wù)器A就可能導(dǎo)致壓力很大,吞吐也容易導(dǎo)致瓶頸.有了分區(qū)后,假設(shè)一個(gè)topic可能分為10個(gè)分區(qū),kafka內(nèi)部會(huì)根據(jù)一定的算法把10分區(qū)盡可能均勻分布到不同的服務(wù)器上,比如:A服務(wù)器負(fù)責(zé)topic的分區(qū)1,B服務(wù)器負(fù)責(zé)topic的分區(qū)2,在此情況下,Producer發(fā)消息時(shí)若沒指定發(fā)送到哪個(gè)分區(qū)的時(shí)候,kafka就會(huì)根據(jù)一定算法上個(gè)消息可能分區(qū)1,下個(gè)消息可能在分區(qū)2.當(dāng)然高級API也能自己實(shí)現(xiàn)其分發(fā)算法.
聞憑15278873982: kafka獲取數(shù)據(jù)的幾種方式 -
明水縣凹面: ______ 一、基于Receiver的方式 這種方式使用Receiver來獲取數(shù)據(jù).Receiver是使用Kafka的高層次Consumer API來實(shí)現(xiàn)的.receiver從Kafka中獲取的數(shù)據(jù)都是存儲(chǔ)在Spark Executor的內(nèi)存中的,然后Spark Streaming啟動(dòng)的job會(huì)去處理那些數(shù)據(jù). ...
聞憑15278873982: oracle 導(dǎo)入 kafka 用什么etl -
明水縣凹面: ______ 可以用PyMySQL代替.安裝方法:pip install PyMySQL 然后在需要的項(xiàng)目中,把 __init__.py中添加兩行:import pymysql pymysql.install_as_MySQLdb() 就可以用 import MySQLdb了.其他的方法與MySQLdb一樣.
聞憑15278873982: kafka apache 使用在什么場合 -
明水縣凹面: ______ 1、Messaging 對于一些常規(guī)的消息系統(tǒng),kafka是個(gè)不錯(cuò)的選擇;partitons/replication和容錯(cuò),可以使kafka具有良好的擴(kuò)展性來和性能優(yōu)勢.不過到目前為止,我們應(yīng)該很清楚認(rèn)識(shí)到,kafka并沒有提供JMS中的"事務(wù)性""消息傳輸擔(dān)保(消息...
(1)、通O(1)磁盤數(shù)據(jù)結(jié)構(gòu)提供消息持久化種結(jié)構(gòu)于即使數(shù)TB消息存儲(chǔ)能夠保持間穩(wěn)定性能
(2)、高吞吐量:即使非普通硬件kafka支持每秒數(shù)十萬消息
(3)、支持通kafka服務(wù)器消費(fèi)機(jī)集群區(qū)消息
(4)、支持Hadoop并行數(shù)據(jù)加載
、用Kafka面自帶腳本進(jìn)行編譯
載Kafka源碼面自帶gradlew腳本我利用編譯Kafka源碼:
1 # wget
2 # tar -zxf kafka-0.8.1.1-src.tgz
3 # cd kafka-0.8.1.1-src
4 # ./gradlew releaseTarGz
運(yùn)行面命令進(jìn)行編譯現(xiàn)異信息:
01 :core:signArchives FAILED
02
03 FAILURE: Build failed with an exception.
04
05 * What went wrong:
06 Execution failed for task ':core:signArchives'.
07 > Cannot perform signing task ':core:signArchives' because it
08 has no configured signatory
09
10 * Try:
11 Run with --stacktrace option to get the stack trace. Run with
12 --info or --debug option to get more log output.
13
14 BUILD FAILED
bug()用面命令進(jìn)行編譯
1 ./gradlew releaseTarGzAll -x signArchives
候編譯功(編譯程現(xiàn))編譯程我指定應(yīng)Scala版本進(jìn)行編譯:
1 ./gradlew -PscalaVersion=2.10.3 releaseTarGz -x signArchives
編譯完core/build/distributions/面kafka_2.10-0.8.1.1.tgz文件網(wǎng)載直接用
二、利用sbt進(jìn)行編譯
我同用sbt編譯Kafka步驟:
01 # git clone
02 # cd kafka
03 # git checkout -b 0.8 remotes/origin/0.8
04 # ./sbt update
05 [info] [SUCCESSFUL ] org.eclipse.jdt#core;3.1.1!core.jar (2243ms)
06 [info] downloading ...
07 [info] [SUCCESSFUL ] ant#ant;1.6.5!ant.jar (1150ms)
08 [info] Done updating.
09 [info] Resolving org.apache.hadoop#hadoop-core;0.20.2 ...
10 [info] Done updating.
11 [info] Resolving com.yammer.metrics#metrics-annotation;2.2.0 ...
12 [info] Done updating.
13 [info] Resolving com.yammer.metrics#metrics-annotation;2.2.0 ...
14 [info] Done updating.
15 [success] Total time: 168 s, completed Jun 18, 2014 6:51:38 PM
16
17 # ./sbt package
18 [info] Set current project to Kafka (in build file:/export1/spark/kafka/)
19 Getting Scala 2.8.0 ...
20 :: retrieving :: org.scala-sbt#boot-scala
21 confs: [default]
22 3 artifacts copied, 0 already retrieved (14544kB/27ms)
23 [success] Total time: 1 s, completed Jun 18, 2014 6:52:37 PM
于Kafka 0.8及版本需要運(yùn)行命令:
01 # ./sbt assembly-package-dependency
02 [info] Loading project definition from /export1/spark/kafka/project
03 [warn] Multiple resolvers having different access mechanism configured with
04 same name 'sbt-plugin-releases'. To avoid conflict, Remove duplicate project
05 resolvers (`resolvers`) or rename publishing resolver (`publishTo`).
06 [info] Set current project to Kafka (in build file:/export1/spark/kafka/)
07 [warn] Credentials file /home/wyp/.m2/.credentials does not exist
08 [info] Including slf4j-api-1.7.2.jar
09 [info] Including metrics-annotation-2.2.0.jar
10 [info] Including scala-compiler.jar
11 [info] Including scala-library.jar
12 [info] Including slf4j-simple-1.6.4.jar
13 [info] Including metrics-core-2.2.0.jar
14 [info] Including snappy-java-1.0.4.1.jar
15 [info] Including zookeeper-3.3.4.jar
16 [info] Including log4j-1.2.15.jar
17 [info] Including zkclient-0.3.jar
18 [info] Including jopt-simple-3.2.jar
19 [warn] Merging 'META-INF/NOTICE' with strategy 'rename'
20 [warn] Merging 'org/xerial/snappy/native/README' with strategy 'rename'
21 [warn] Merging 'META-INF/maven/org.xerial.snappy/snappy-java/LICENSE'
22 with strategy 'rename'
23 [warn] Merging 'LICENSE.txt' with strategy 'rename'
24 [warn] Merging 'META-INF/LICENSE' with strategy 'rename'
25 [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard'
26 [warn] Strategy 'discard' was applied to a file
27 [warn] Strategy 'rename' was applied to 5 files
28 [success] Total time: 3 s, completed Jun 18, 2014 6:53:41 PM
我sbt面指定scala版本:
01 <!--
02 User: 往記憶
03 Date: 14-6-18
04 Time: 20:20
05 bolg:
06 本文址:/archives/1044
07 往記憶博客專注于hadoop、hive、spark、shark、flume技術(shù)博客量干貨
08 往記憶博客微信公共帳號:iteblog_hadoop
09 -->
10 sbt "++2.10.3 update"
11 sbt "++2.10.3 package"
12 sbt "++2.10.3 assembly-package-dependency"
相關(guān)評說:
明水縣凹面: ______ 如何決定kafka集群中topic,partition的數(shù)量,這是許多kafka用戶經(jīng)常遇到的問題.本文列舉闡述幾個(gè)重要的決定因素,以提供一些參考.分區(qū)多吞吐量更高 一個(gè)話題topic的各個(gè)分區(qū)partiton之間是并行的.在producer和broker方面,寫不同的分區(qū)...
明水縣凹面: ______ Kafka 分布式消息隊(duì)列 類似產(chǎn)品有JBoss、MQ 一、由Linkedln 開源,使用scala開發(fā),有如下幾個(gè)特點(diǎn): (1)高吞吐 (2)分布式 (3)支持多語言客戶端 (C++、Java) 二、組成: 客戶端是 producer 和 consumer,提供一些API,服務(wù)器端是Broker,...
明水縣凹面: ______ 不過要注意一些注意事項(xiàng),對于多個(gè)partition和多個(gè)consumer 一. 如果consumer比partition多,是浪費(fèi),因?yàn)閗afka的設(shè)計(jì)是在一個(gè)partition上是不允許并發(fā)的,所以consumer數(shù)不要大于partition數(shù) 二. 如果consumer比partition少,一個(gè)consumer...
明水縣凹面: ______ 首先我們需要明白以下事實(shí):在kafka中,單個(gè)patition是kafka并行操作的最小單元.在producer和broker端,向每一個(gè)分區(qū)寫入數(shù)據(jù)是可以完全并行化的,此時(shí),可以通過加大硬件資源的利用率來提升系統(tǒng)的吞吐量,例如對數(shù)據(jù)進(jìn)行壓縮.在...
明水縣凹面: ______ 若沒有分區(qū),一個(gè)topic對應(yīng)的消息集在分布式集群服務(wù)組中,就會(huì)分布不均勻,即可能導(dǎo)致某臺(tái)服務(wù)器A記錄當(dāng)前topic的消息集很多,若此topic的消息壓力很大的情況下,服務(wù)器A就可能導(dǎo)致壓力很大,吞吐也容易導(dǎo)致瓶頸.有了分區(qū)后,假設(shè)一個(gè)topic可能分為10個(gè)分區(qū),kafka內(nèi)部會(huì)根據(jù)一定的算法把10分區(qū)盡可能均勻分布到不同的服務(wù)器上,比如:A服務(wù)器負(fù)責(zé)topic的分區(qū)1,B服務(wù)器負(fù)責(zé)topic的分區(qū)2,在此情況下,Producer發(fā)消息時(shí)若沒指定發(fā)送到哪個(gè)分區(qū)的時(shí)候,kafka就會(huì)根據(jù)一定算法上個(gè)消息可能分區(qū)1,下個(gè)消息可能在分區(qū)2.當(dāng)然高級API也能自己實(shí)現(xiàn)其分發(fā)算法.
明水縣凹面: ______ 一、基于Receiver的方式 這種方式使用Receiver來獲取數(shù)據(jù).Receiver是使用Kafka的高層次Consumer API來實(shí)現(xiàn)的.receiver從Kafka中獲取的數(shù)據(jù)都是存儲(chǔ)在Spark Executor的內(nèi)存中的,然后Spark Streaming啟動(dòng)的job會(huì)去處理那些數(shù)據(jù). ...
明水縣凹面: ______ 可以用PyMySQL代替.安裝方法:pip install PyMySQL 然后在需要的項(xiàng)目中,把 __init__.py中添加兩行:import pymysql pymysql.install_as_MySQLdb() 就可以用 import MySQLdb了.其他的方法與MySQLdb一樣.
明水縣凹面: ______ 1、Messaging 對于一些常規(guī)的消息系統(tǒng),kafka是個(gè)不錯(cuò)的選擇;partitons/replication和容錯(cuò),可以使kafka具有良好的擴(kuò)展性來和性能優(yōu)勢.不過到目前為止,我們應(yīng)該很清楚認(rèn)識(shí)到,kafka并沒有提供JMS中的"事務(wù)性""消息傳輸擔(dān)保(消息...