Apache > Hadoop > Core
 

Hadoop Streaming

Hadoop Streaming

Hadoop streaming鏄疕adoop鐨勪竴涓伐鍏凤紝 瀹冨府鍔╃敤鎴峰垱寤哄拰杩愯涓绫荤壒娈婄殑map/reduce浣滀笟锛 杩欎簺鐗规畩鐨刴ap/reduce浣滀笟鏄敱涓浜涘彲鎵ц鏂囦欢鎴栬剼鏈枃浠跺厖褰搈apper鎴栬卹educer銆備緥濡傦細

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper /bin/cat \
    -reducer /bin/wc

Streaming宸ヤ綔鍘熺悊

鍦ㄤ笂闈㈢殑渚嬪瓙閲岋紝mapper鍜宺educer閮芥槸鍙墽琛屾枃浠讹紝瀹冧滑浠庢爣鍑嗚緭鍏ヨ鍏ユ暟鎹紙涓琛屼竴琛岃锛夛紝 骞舵妸璁$畻缁撴灉鍙戠粰鏍囧噯杈撳嚭銆係treaming宸ュ叿浼氬垱寤轰竴涓狹ap/Reduce浣滀笟锛 骞舵妸瀹冨彂閫佺粰鍚堥傜殑闆嗙兢锛屽悓鏃剁洃瑙嗚繖涓綔涓氱殑鏁翠釜鎵ц杩囩▼銆

濡傛灉涓涓彲鎵ц鏂囦欢琚敤浜巑apper锛屽垯鍦╩apper鍒濆鍖栨椂锛 姣忎竴涓猰apper浠诲姟浼氭妸杩欎釜鍙墽琛屾枃浠朵綔涓轰竴涓崟鐙殑杩涚▼鍚姩銆 mapper浠诲姟杩愯鏃讹紝瀹冩妸杈撳叆鍒囧垎鎴愯骞舵妸姣忎竴琛屾彁渚涚粰鍙墽琛屾枃浠惰繘绋嬬殑鏍囧噯杈撳叆銆 鍚屾椂锛宮apper鏀堕泦鍙墽琛屾枃浠惰繘绋嬫爣鍑嗚緭鍑虹殑鍐呭锛屽苟鎶婃敹鍒扮殑姣忎竴琛屽唴瀹硅浆鍖栨垚key/value瀵癸紝浣滀负mapper鐨勮緭鍑恒 榛樿鎯呭喌涓嬶紝涓琛屼腑绗竴涓猼ab涔嬪墠鐨勯儴鍒嗕綔涓key锛屼箣鍚庣殑锛堜笉鍖呮嫭tab锛変綔涓value銆 濡傛灉娌℃湁tab锛屾暣琛屼綔涓簁ey鍊硷紝value鍊间负null銆備笉杩囷紝杩欏彲浠ュ畾鍒讹紝鍦ㄤ笅鏂囦腑灏嗕細璁ㄨ濡備綍鑷畾涔塳ey鍜寁alue鐨勫垏鍒嗘柟寮忋

濡傛灉涓涓彲鎵ц鏂囦欢琚敤浜巖educer锛屾瘡涓猺educer浠诲姟浼氭妸杩欎釜鍙墽琛屾枃浠朵綔涓轰竴涓崟鐙殑杩涚▼鍚姩銆 Reducer浠诲姟杩愯鏃讹紝瀹冩妸杈撳叆鍒囧垎鎴愯骞舵妸姣忎竴琛屾彁渚涚粰鍙墽琛屾枃浠惰繘绋嬬殑鏍囧噯杈撳叆銆 鍚屾椂锛宺educer鏀堕泦鍙墽琛屾枃浠惰繘绋嬫爣鍑嗚緭鍑虹殑鍐呭锛屽苟鎶婃瘡涓琛屽唴瀹硅浆鍖栨垚key/value瀵癸紝浣滀负reducer鐨勮緭鍑恒 榛樿鎯呭喌涓嬶紝涓琛屼腑绗竴涓猼ab涔嬪墠鐨勯儴鍒嗕綔涓簁ey锛屼箣鍚庣殑锛堜笉鍖呮嫭tab锛変綔涓簐alue銆傚湪涓嬫枃涓皢浼氳璁哄浣曡嚜瀹氫箟key鍜寁alue鐨勫垏鍒嗘柟寮忋

杩欐槸Map/Reduce妗嗘灦鍜宻treaming mapper/reducer涔嬮棿鐨勫熀鏈氫俊鍗忚銆

鐢ㄦ埛涔熷彲浠ヤ娇鐢╦ava绫讳綔涓簃apper鎴栬卹educer銆備笂闈㈢殑渚嬪瓙涓庤繖閲岀殑浠g爜绛変环锛

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper org.apache.hadoop.mapred.lib.IdentityMapper \
    -reducer /bin/wc

鐢ㄦ埛鍙互璁惧畾stream.non.zero.exit.is.failure truefalse 鏉ヨ〃鏄巗treaming task鐨勮繑鍥炲奸潪闆舵椂鏄 Failure 杩樻槸Success銆傞粯璁ゆ儏鍐碉紝streaming task杩斿洖闈為浂鏃惰〃绀哄け璐ャ

灏嗘枃浠舵墦鍖呭埌鎻愪氦鐨勪綔涓氫腑

浠讳綍鍙墽琛屾枃浠堕兘鍙互琚寚瀹氫负mapper/reducer銆傝繖浜涘彲鎵ц鏂囦欢涓嶉渶瑕佷簨鍏堝瓨鏀惧湪闆嗙兢涓婏紱 濡傛灉鍦ㄩ泦缇や笂杩樻病鏈夛紝鍒欓渶瑕佺敤-file閫夐」璁ゝramework鎶婂彲鎵ц鏂囦欢浣滀负浣滀笟鐨勪竴閮ㄥ垎锛屼竴璧锋墦鍖呮彁浜ゃ備緥濡傦細

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper myPythonScript.py \
    -reducer /bin/wc \
    -file myPythonScript.py 

涓婇潰鐨勪緥瀛愭弿杩颁簡涓涓敤鎴锋妸鍙墽琛宲ython鏂囦欢浣滀负mapper銆 鍏朵腑鐨勯夐」“-file myPythonScirpt.py”浣垮彲鎵цpython鏂囦欢浣滀负浣滀笟鎻愪氦鐨勪竴閮ㄥ垎琚笂浼犲埌闆嗙兢鐨勬満鍣ㄤ笂銆

闄や簡鍙墽琛屾枃浠跺锛屽叾浠杕apper鎴杛educer闇瑕佺敤鍒扮殑杈呭姪鏂囦欢锛堟瘮濡傚瓧鍏革紝閰嶇疆鏂囦欢绛夛級涔熷彲浠ョ敤杩欑鏂瑰紡鎵撳寘涓婁紶銆備緥濡傦細

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper myPythonScript.py \
    -reducer /bin/wc \
    -file myPythonScript.py \
    -file myDictionary.txt

Streaming閫夐」涓庣敤娉

鍙娇鐢∕apper鐨勪綔涓

鏈夋椂鍙渶瑕乵ap鍑芥暟澶勭悊杈撳叆鏁版嵁銆傝繖鏃跺彧闇鎶妋apred.reduce.tasks璁剧疆涓洪浂锛孧ap/reduce妗嗘灦灏变笉浼氬垱寤簉educer浠诲姟锛宮apper浠诲姟鐨勮緭鍑哄氨鏄暣涓綔涓氱殑鏈缁堣緭鍑恒

涓轰簡鍋氬埌鍚戜笅鍏煎锛孒adoop Streaming涔熸敮鎸“-reduce None”閫夐」锛屽畠涓“-jobconf mapred.reduce.tasks=0”绛変环銆

涓轰綔涓氭寚瀹氬叾浠栨彃浠

鍜屽叾浠栨櫘閫氱殑Map/Reduce浣滀笟涓鏍凤紝鐢ㄦ埛鍙互涓簊treaming浣滀笟鎸囧畾鍏朵粬鎻掍欢锛

   -inputformat JavaClassName
   -outputformat JavaClassName
   -partitioner JavaClassName
   -combiner JavaClassName

鐢ㄤ簬澶勭悊杈撳叆鏍煎紡鐨勭被瑕佽兘杩斿洖Text绫诲瀷鐨刱ey/value瀵广傚鏋滀笉鎸囧畾杈撳叆鏍煎紡锛屽垯榛樿浼氫娇鐢═extInputFormat銆 鍥犱负TextInputFormat寰楀埌鐨刱ey鍊兼槸LongWritable绫诲瀷鐨勶紙鍏跺疄key鍊煎苟涓嶆槸杈撳叆鏂囦欢涓殑鍐呭锛岃屾槸value鍋忕Щ閲忥級锛 鎵浠ey浼氳涓㈠純锛屽彧鎶妚alue鐢ㄧ閬撴柟寮忓彂缁檓apper銆

鐢ㄦ埛鎻愪緵鐨勫畾涔夎緭鍑烘牸寮忕殑绫婚渶瑕佽兘澶熷鐞員ext绫诲瀷鐨刱ey/value瀵广傚鏋滀笉鎸囧畾杈撳嚭鏍煎紡锛屽垯榛樿浼氫娇鐢═extOutputFormat绫汇

Hadoop Streaming涓殑澶ф枃浠跺拰妗f

浠诲姟浣跨敤-cacheFile鍜-cacheArchive閫夐」鍦ㄩ泦缇や腑鍒嗗彂鏂囦欢鍜屾。妗堬紝閫夐」鐨勫弬鏁版槸鐢ㄦ埛宸蹭笂浼犺嚦HDFS鐨勬枃浠舵垨妗f鐨刄RI銆傝繖浜涙枃浠跺拰妗f鍦ㄤ笉鍚岀殑浣滀笟闂寸紦瀛樸傜敤鎴峰彲浠ラ氳繃fs.default.name.config閰嶇疆鍙傛暟鐨勫煎緱鍒版枃浠舵墍鍦ㄧ殑host鍜宖s_port銆

杩欎釜鏄娇鐢-cacheFile閫夐」鐨勪緥瀛愶細

-cacheFile hdfs://host:fs_port/user/testfile.txt#testlink

鍦ㄤ笂闈㈢殑渚嬪瓙閲岋紝url涓#鍚庨潰鐨勯儴鍒嗘槸寤虹珛鍦ㄤ换鍔″綋鍓嶅伐浣滅洰褰曚笅鐨勭鍙烽摼鎺ョ殑鍚嶅瓧銆傝繖閲岀殑浠诲姟鐨勫綋鍓嶅伐浣滅洰褰曚笅鏈変竴涓“testlink”绗﹀彿閾炬帴锛屽畠鎸囧悜testfile.txt鏂囦欢鍦ㄦ湰鍦扮殑鎷疯礉銆傚鏋滄湁澶氫釜鏂囦欢锛岄夐」鍙互鍐欐垚锛

-cacheFile hdfs://host:fs_port/user/testfile1.txt#testlink1 -cacheFile hdfs://host:fs_port/user/testfile2.txt#testlink2

-cacheArchive閫夐」鐢ㄤ簬鎶妀ar鏂囦欢鎷疯礉鍒颁换鍔″綋鍓嶅伐浣滅洰褰曞苟鑷姩鎶妀ar鏂囦欢瑙e帇缂┿備緥濡傦細

-cacheArchive hdfs://host:fs_port/user/testfile.jar#testlink3

鍦ㄤ笂闈㈢殑渚嬪瓙涓紝testlink3鏄綋鍓嶅伐浣滅洰褰曚笅鐨勭鍙烽摼鎺ワ紝瀹冩寚鍚憈estfile.jar瑙e帇鍚庣殑鐩綍銆

涓嬮潰鏄娇鐢-cacheArchive閫夐」鐨勫彟涓涓緥瀛愩傚叾涓紝input.txt鏂囦欢鏈変袱琛屽唴瀹癸紝鍒嗗埆鏄袱涓枃浠剁殑鍚嶅瓧锛歵estlink/cache.txt鍜宼estlink/cache2.txt銆“testlink”鏄寚鍚戞。妗堢洰褰曪紙jar鏂囦欢瑙e帇鍚庣殑鐩綍锛夌殑绗﹀彿閾炬帴锛岃繖涓洰褰曚笅鏈“cache.txt”鍜“cache2.txt”涓や釜鏂囦欢銆

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
                  -input "/user/me/samples/cachefile/input.txt"  \
                  -mapper "xargs cat"  \
                  -reducer "cat"  \
                  -output "/user/me/samples/cachefile/out" \  
                  -cacheArchive 'hdfs://hadoop-nn1.example.com/user/me/samples/cachefile/cachedir.jar#testlink' \  
                  -jobconf mapred.map.tasks=1 \
                  -jobconf mapred.reduce.tasks=1 \ 
                  -jobconf mapred.job.name="Experiment"

$ ls test_jar/
cache.txt  cache2.txt

$ jar cvf cachedir.jar -C test_jar/ .
added manifest
adding: cache.txt(in = 30) (out= 29)(deflated 3%)
adding: cache2.txt(in = 37) (out= 35)(deflated 5%)

$ hadoop dfs -put cachedir.jar samples/cachefile

$ hadoop dfs -cat /user/me/samples/cachefile/input.txt
testlink/cache.txt
testlink/cache2.txt

$ cat test_jar/cache.txt 
This is just the cache string

$ cat test_jar/cache2.txt 
This is just the second cache string

$ hadoop dfs -ls /user/me/samples/cachefile/out      
Found 1 items
/user/me/samples/cachefile/out/part-00000  <r 3>   69

$ hadoop dfs -cat /user/me/samples/cachefile/out/part-00000
This is just the cache string   
This is just the second cache string

涓轰綔涓氭寚瀹氶檮鍔犻厤缃弬鏁

鐢ㄦ埛鍙互浣跨敤“-jobconf <n>=<v>”澧炲姞涓浜涢厤缃彉閲忋備緥濡傦細

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper org.apache.hadoop.mapred.lib.IdentityMapper\
    -reducer /bin/wc \
    -jobconf mapred.reduce.tasks=2

涓婇潰鐨勪緥瀛愪腑锛-jobconf mapred.reduce.tasks=2琛ㄦ槑鐢ㄤ袱涓猺educer瀹屾垚浣滀笟銆

鍏充簬jobconf鍙傛暟鐨勬洿澶氱粏鑺傚彲浠ュ弬鑰冿細hadoop-default.html

鍏朵粬閫夐」

Streaming 浣滀笟鐨勫叾浠栭夐」濡備笅琛細

閫夐」鍙/蹇呴』鎻忚堪
-cluster name 鍙 鍦ㄦ湰鍦癏adoop闆嗙兢涓庝竴涓垨澶氫釜杩滅▼闆嗙兢闂村垏鎹
-dfs host:port or local 鍙 瑕嗙洊浣滀笟鐨凥DFS閰嶇疆
-jt host:port or local 鍙 瑕嗙洊浣滀笟鐨凧obTracker閰嶇疆
-additionalconfspec specfile 鍙 鐢ㄤ竴涓被浼间簬hadoop-site.xml鐨刋ML鏂囦欢淇濆瓨鎵鏈夐厤缃紝浠庤屼笉闇瑕佺敤澶氫釜"-jobconf name=value"绫诲瀷鐨勯夐」鍗曠嫭涓烘瘡涓厤缃彉閲忚祴鍊
-cmdenv name=value 鍙 浼犻掔幆澧冨彉閲忕粰streaming鍛戒护
-cacheFile fileNameURI 鍙 鎸囧畾涓涓笂浼犲埌HDFS鐨勬枃浠
-cacheArchive fileNameURI 鍙 鎸囧畾涓涓笂浼犲埌HDFS鐨刯ar鏂囦欢锛岃繖涓猨ar鏂囦欢浼氳鑷姩瑙e帇缂╁埌褰撳墠宸ヤ綔鐩綍涓
-inputreader JavaClassName 鍙 涓轰簡鍚戜笅鍏煎锛氭寚瀹氫竴涓猺ecord reader绫伙紙鑰屼笉鏄痠nput format绫伙級
-verbose 鍙 璇︾粏杈撳嚭

浣跨敤-cluster <name>瀹炵幇“鏈湴”Hadoop鍜屼竴涓垨澶氫釜杩滅▼Hadoop闆嗙兢闂村垏鎹€傞粯璁ゆ儏鍐典笅锛屼娇鐢╤adoop-default.xml鍜宧adoop-site.xml锛涘綋浣跨敤-cluster <name>閫夐」鏃讹紝浼氫娇鐢$HADOOP_HOME/conf/hadoop-<name>.xml銆

涓嬮潰鐨勯夐」鏀瑰彉temp鐩綍锛

  -jobconf dfs.data.dir=/tmp

涓嬮潰鐨勯夐」鎸囧畾鍏朵粬鏈湴temp鐩綍锛

   -jobconf mapred.local.dir=/tmp/local
   -jobconf mapred.system.dir=/tmp/system
   -jobconf mapred.temp.dir=/tmp/temp

鏇村鏈夊叧jobconf鐨勭粏鑺傝鍙傝冿細http://wiki.apache.org/hadoop/JobConfFile

鍦╯treaming鍛戒护涓缃幆澧冨彉閲忥細

-cmdenv EXAMPLE_DIR=/home/example/dictionaries/

鍏朵粬渚嬪瓙

浣跨敤鑷畾涔夌殑鏂规硶鍒囧垎琛屾潵褰㈡垚Key/Value瀵

涔嬪墠宸茬粡鎻愬埌锛屽綋Map/Reduce妗嗘灦浠巑apper鐨勬爣鍑嗚緭鍏ヨ鍙栦竴琛屾椂锛屽畠鎶婅繖涓琛屽垏鍒嗕负key/value瀵广 鍦ㄩ粯璁ゆ儏鍐典笅锛屾瘡琛岀涓涓猼ab绗︿箣鍓嶇殑閮ㄥ垎浣滀负key锛屼箣鍚庣殑閮ㄥ垎浣滀负value锛堜笉鍖呮嫭tab绗︼級銆

浣嗘槸锛岀敤鎴峰彲浠ヨ嚜瀹氫箟锛屽彲浠ユ寚瀹氬垎闅旂鏄叾浠栧瓧绗﹁屼笉鏄粯璁ょ殑tab绗︼紝鎴栬呮寚瀹氬湪绗琻锛坣>=1锛変釜鍒嗗壊绗﹀鍒嗗壊鑰屼笉鏄粯璁ょ殑绗竴涓備緥濡傦細

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper org.apache.hadoop.mapred.lib.IdentityMapper \
    -reducer org.apache.hadoop.mapred.lib.IdentityReducer \
    -jobconf stream.map.output.field.separator=. \
    -jobconf stream.num.map.output.key.fields=4 

鍦ㄤ笂闈㈢殑渚嬪瓙锛“-jobconf stream.map.output.field.separator=.”鎸囧畾“.”浣滀负map杈撳嚭鍐呭鐨勫垎闅旂锛屽苟涓斾粠鍦ㄧ鍥涗釜“.”涔嬪墠鐨勯儴鍒嗕綔涓簁ey锛屼箣鍚庣殑閮ㄥ垎浣滀负value锛堜笉鍖呮嫭杩欑鍥涗釜“.”锛夈 濡傛灉涓琛屼腑鐨“.”灏戜簬鍥涗釜锛屽垯鏁磋鐨勫唴瀹逛綔涓簁ey锛寁alue璁句负绌虹殑Text瀵硅薄锛堝氨鍍忚繖鏍峰垱寤轰簡涓涓猅ext锛歯ew Text("")锛夈

鍚屾牱锛岀敤鎴峰彲浠ヤ娇鐢“-jobconf stream.reduce.output.field.separator=SEP”鍜“-jobconf stream.num.reduce.output.fields=NUM”鏉ユ寚瀹歳educe杈撳嚭鐨勮涓紝绗嚑涓垎闅旂澶勫垎鍓瞜ey鍜寁alue銆

涓涓疄鐢ㄧ殑Partitioner绫 锛堜簩娆℃帓搴忥紝-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 閫夐」锛

Hadoop鏈変竴涓伐鍏风被org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner锛 瀹冨湪搴旂敤绋嬪簭涓緢鏈夌敤銆侻ap/reduce妗嗘灦鐢ㄨ繖涓被鍒囧垎map鐨勮緭鍑猴紝 鍒囧垎鏄熀浜巏ey鍊肩殑鍓嶇紑锛岃屼笉鏄暣涓猭ey銆備緥濡傦細

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper org.apache.hadoop.mapred.lib.IdentityMapper \
    -reducer org.apache.hadoop.mapred.lib.IdentityReducer \
    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
    -jobconf stream.map.output.field.separator=. \
    -jobconf stream.num.map.output.key.fields=4 \
    -jobconf map.output.key.field.separator=. \
    -jobconf num.key.fields.for.partition=2 \
    -jobconf mapred.reduce.tasks=12

鍏朵腑锛-jobconf stream.map.output.field.separator=.-jobconf stream.num.map.output.key.fields=4鏄墠鏂囦腑鐨勪緥瀛愩係treaming鐢ㄨ繖涓や釜鍙橀噺鏉ュ緱鍒癿apper鐨刱ey/value瀵广

涓婇潰鐨凪ap/Reduce 浣滀笟涓璵ap杈撳嚭鐨刱ey涓鑸槸鐢“.”鍒嗗壊鎴愮殑鍥涘潡銆備絾鏄洜涓轰娇鐢ㄤ簡 -jobconf num.key.fields.for.partition=2 閫夐」锛屾墍浠ap/Reduce妗嗘灦浣跨敤key鐨勫墠涓ゅ潡鏉ュ垏鍒唌ap鐨勮緭鍑恒傚叾涓紝 -jobconf map.output.key.field.separator=. 鎸囧畾浜嗚繖娆″垏鍒嗕娇鐢ㄧ殑key鐨勫垎闅旂銆傝繖鏍峰彲浠ヤ繚璇佸湪鎵鏈塳ey/value瀵逛腑锛 key鍊煎墠涓や釜鍧楀肩浉鍚岀殑鎵鏈塳ey琚垎鍒颁竴缁勶紝鍒嗛厤缁欎竴涓猺educer銆

杩欑楂樻晥鐨勬柟娉曠瓑浠蜂簬鎸囧畾鍓嶄袱鍧椾綔涓轰富閿紝鍚庝袱鍧椾綔涓哄壇閿 涓婚敭鐢ㄤ簬鍒囧垎鍧楋紝涓婚敭鍜屽壇閿殑缁勫悎鐢ㄤ簬鎺掑簭銆涓涓畝鍗曠殑绀轰緥濡備笅锛

Map鐨勮緭鍑猴紙key锛

11.12.1.2
11.14.2.3
11.11.4.1
11.12.1.1
11.14.2.2

鍒囧垎缁3涓猺educer锛堝墠涓ゅ潡鐨勫肩敤浜庡垏鍒嗭級

11.11.4.1
-----------
11.12.1.2
11.12.1.1
-----------
11.14.2.3
11.14.2.2

鍦ㄦ瘡涓垏鍒嗗悗鐨勭粍鍐呮帓搴忥紙鍥涗釜鍧楃殑鍊奸兘鐢ㄤ簬鎺掑簭锛

11.11.4.1
-----------
11.12.1.1
11.12.1.2
-----------
11.14.2.2
11.14.2.3

Hadoop鑱氬悎鍔熻兘鍖呯殑浣跨敤锛-reduce aggregate 閫夐」锛

Hadoop鏈変竴涓伐鍏峰寘“Aggregate”锛 https://svn.apache.org/repos/asf/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate锛夈 “Aggregate”鎻愪緵涓涓壒娈婄殑reducer绫诲拰涓涓壒娈婄殑combiner绫伙紝 骞朵笖鏈変竴绯诲垪鐨“鑱氬悎鍣”锛“aggregator”锛夛紙渚嬪“sum”锛“max”锛“min”绛夛級鐢ㄤ簬鑱氬悎涓缁剉alue鐨勫簭鍒椼 鐢ㄦ埛鍙互浣跨敤Aggregate瀹氫箟涓涓猰apper鎻掍欢绫伙紝 杩欎釜绫荤敤浜庝负mapper杈撳叆鐨勬瘡涓猭ey/value瀵逛骇鐢“鍙仛鍚堥」”銆 combiner/reducer鍒╃敤閫傚綋鐨勮仛鍚堝櫒鑱氬悎杩欎簺鍙仛鍚堥」銆

瑕佷娇鐢ˋggregate锛屽彧闇鎸囧畾“-reducer aggregate”锛

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper myAggregatorForKeyCount.py \
    -reducer aggregate \
    -file myAggregatorForKeyCount.py \
    -jobconf mapred.reduce.tasks=12

python绋嬪簭myAggregatorForKeyCount.py渚嬪瓙锛

#!/usr/bin/python

import sys;

def generateLongCountToken(id):
    return "LongValueSum:" + id + "\t" + "1"

def main(argv):
    line = sys.stdin.readline();
    try:
        while line:
            line = line[:-1];
            fields = line.split("\t");
            print generateLongCountToken(fields[0]);
            line = sys.stdin.readline();
    except "end of file":
        return None
if __name__ == "__main__":
     main(sys.argv)

瀛楁鐨勯夊彇锛堢被浼间簬unix涓殑 'cut' 鍛戒护锛

Hadoop鐨勫伐鍏风被org.apache.hadoop.mapred.lib.FieldSelectionMapReduce甯姪鐢ㄦ埛楂樻晥澶勭悊鏂囨湰鏁版嵁锛 灏卞儚unix涓殑“cut”宸ュ叿銆傚伐鍏风被涓殑map鍑芥暟鎶婅緭鍏ョ殑key/value瀵圭湅浣滃瓧娈电殑鍒楄〃銆 鐢ㄦ埛鍙互鎸囧畾瀛楁鐨勫垎闅旂锛堥粯璁ゆ槸tab锛夛紝 鍙互閫夋嫨瀛楁鍒楄〃涓换鎰忎竴娈碉紙鐢卞垪琛ㄤ腑涓涓垨澶氫釜瀛楁缁勬垚锛変綔涓簃ap杈撳嚭鐨刱ey鎴栬卾alue銆 鍚屾牱锛屽伐鍏风被涓殑reduce鍑芥暟涔熸妸杈撳叆鐨刱ey/value瀵圭湅浣滃瓧娈电殑鍒楄〃锛岀敤鎴峰彲浠ラ夊彇浠绘剰涓娈典綔涓簉educe杈撳嚭鐨刱ey鎴杤alue銆備緥濡傦細

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper org.apache.hadoop.mapred.lib.FieldSelectionMapReduce\
    -reducer org.apache.hadoop.mapred.lib.FieldSelectionMapReduce\
    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
    -jobconf map.output.key.field.separa=. \
    -jobconf num.key.fields.for.partition=2 \
    -jobconf mapred.data.field.separator=. \
    -jobconf map.output.key.value.fields.spec=6,5,1-3:0- \
    -jobconf reduce.output.key.value.fields.spec=0-2:5- \
    -jobconf mapred.reduce.tasks=12

閫夐」“-jobconf map.output.key.value.fields.spec=6,5,1-3:0-”鎸囧畾浜嗗浣曚负map鐨勮緭鍑洪夊彇key鍜寁alue銆侹ey閫夊彇瑙勫垯鍜寁alue閫夊彇瑙勫垯鐢“:”鍒嗗壊銆 鍦ㄨ繖涓緥瀛愪腑锛宮ap杈撳嚭鐨刱ey鐢卞瓧娈6锛5锛1锛2鍜3缁勬垚銆傝緭鍑虹殑value鐢辨墍鏈夊瓧娈电粍鎴愶紙“0-”鎸囧瓧娈0浠ュ強涔嬪悗鎵鏈夊瓧娈碉級銆

閫夐」“-jobconf reduce.output.key.value.fields.spec=0-2:0-”锛堣瘧鑰呮敞锛氭澶勫簲涓”0-2:5-“锛夋寚瀹氬浣曚负reduce鐨勮緭鍑洪夊彇value銆 鏈緥涓紝reduce鐨勮緭鍑虹殑key灏嗗寘鍚瓧娈0锛1锛2锛堝搴斾簬鍘熷鐨勫瓧娈6锛5锛1锛夈 reduce杈撳嚭鐨剉alue灏嗗寘鍚捣鑷瓧娈5鐨勬墍鏈夊瓧娈碉紙瀵瑰簲浜庢墍鏈夌殑鍘熷瀛楁锛夈

甯歌闂

鎴戣鎬庢牱浣跨敤Hadoop Streaming杩愯涓缁勭嫭绔嬶紙鐩稿叧锛夌殑浠诲姟鍛紵

澶氭暟鎯呭喌涓嬶紝浣犱笉闇瑕丮ap Reduce鐨勫叏閮ㄥ姛鑳斤紝 鑰屽彧闇瑕佽繍琛屽悓涓绋嬪簭鐨勫涓疄渚嬶紝鎴栬呬娇鐢ㄤ笉鍚屾暟鎹紝鎴栬呭湪鐩稿悓鏁版嵁涓婁娇鐢ㄤ笉鍚岀殑鍙傛暟銆 浣犲彲浠ラ氳繃Hadoop Streaming鏉ュ疄鐜般

濡備綍澶勭悊澶氫釜鏂囦欢锛屽叾涓瘡涓枃浠朵竴涓猰ap锛

渚嬪杩欐牱涓涓棶棰橈紝鍦ㄩ泦缇や笂鍘嬬缉锛坺ipping锛変竴浜涙枃浠讹紝浣犲彲浠ヤ娇鐢ㄤ互涓嬪嚑绉嶆柟娉曪細

  1. 浣跨敤Hadoop Streaming鍜岀敤鎴风紪鍐欑殑mapper鑴氭湰绋嬪簭锛
    • 鐢熸垚涓涓枃浠讹紝鏂囦欢涓寘鍚墍鏈夎鍘嬬缉鐨勬枃浠跺湪HDFS涓婄殑瀹屾暣璺緞銆傛瘡涓猰ap 浠诲姟鑾峰緱涓涓矾寰勫悕浣滀负杈撳叆銆
    • 鍒涘缓涓涓猰apper鑴氭湰绋嬪簭锛屽疄鐜板涓嬪姛鑳斤細鑾峰緱鏂囦欢鍚嶏紝鎶婅鏂囦欢鎷疯礉鍒版湰鍦帮紝鍘嬬缉璇ユ枃浠跺苟鎶婂畠鍙戝埌鏈熸湜鐨勮緭鍑虹洰褰曘
  2. 浣跨敤鐜版湁鐨凥adoop妗嗘灦锛
    • 鍦╩ain鍑芥暟涓坊鍔犲涓嬪懡浠わ細
             FileOutputFormat.setCompressOutput(conf, true);
             FileOutputFormat.setOutputCompressorClass(conf, org.apache.hadoop.io.compress.GzipCodec.class);
             conf.setOutputFormat(NonSplitableTextInputFormat.class);
             conf.setNumReduceTasks(0);
      
    • 缂栧啓map鍑芥暟锛
      
             public void map(WritableComparable key, Writable value, 
                                     OutputCollector output, 
                                     Reporter reporter) throws IOException {
                  output.collect((Text)value, null);
             }
      
    • 娉ㄦ剰杈撳嚭鐨勬枃浠跺悕鍜屽師鏂囦欢鍚嶄笉鍚

搴旇浣跨敤澶氬皯涓猺educer锛

璇峰弬鑰僅adoop Wiki锛Reducer

濡傛灉鍦⊿hell鑴氭湰閲岃缃竴涓埆鍚嶏紝骞舵斁鍦-mapper涔嬪悗锛孲treaming浼氭甯歌繍琛屽悧锛 渚嬪锛宎lias cl='cut -fl'锛-mapper "cl"浼氳繍琛屾甯稿悧锛

鑴氭湰閲屾棤娉曚娇鐢ㄥ埆鍚嶏紝浣嗘槸鍏佽鍙橀噺鏇挎崲锛屼緥濡傦細

$ hadoop dfs -cat samples/student_marks
alice   50
bruce   70
charlie 80
dan     75

$ c2='cut -f2'; $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
    -input /user/me/samples/student_marks 
    -mapper \"$c2\" -reducer 'cat'  
    -output /user/me/samples/student_out 
    -jobconf mapred.job.name='Experiment'

$ hadoop dfs -ls samples/student_out
Found 1 items/user/me/samples/student_out/part-00000    <r 3>   16

$ hadoop dfs -cat samples/student_out/part-00000
50
70
75
80

鎴戝彲浠ヤ娇鐢║NIX pipes鍚楋紵渚嬪 –mapper "cut –fl | set s/foo/bar/g"绠$敤涔堬紵

鐜板湪涓嶆敮鎸侊紝鑰屼笖浼氱粰鍑洪敊璇俊鎭“java.io.IOException: Broken pipe”銆傝繖鎴栬鏄竴涓猙ug锛岄渶瑕佽繘涓姝ョ爺绌躲

鍦╯treaming浣滀笟涓敤-file閫夐」杩愯涓涓垎甯冨紡鐨勮秴澶у彲鎵ц鏂囦欢锛堜緥濡傦紝3.6G锛夋椂锛 鎴戝緱鍒颁簡涓涓敊璇俊鎭“No space left on device”銆傚浣曡В鍐筹紵

閰嶇疆鍙橀噺stream.tmpdir鎸囧畾浜嗕竴涓洰褰曪紝鍦ㄨ繖涓洰褰曚笅瑕佽繘琛屾墦jar鍖呯殑鎿嶄綔銆俿tream.tmpdir鐨勯粯璁ゅ兼槸/tmp锛屼綘闇瑕佸皢杩欎釜鍊艰缃负涓涓湁鏇村ぇ绌洪棿鐨勭洰褰曪細

-jobconf stream.tmpdir=/export/bigspace/...

濡備綍璁剧疆澶氫釜杈撳叆鐩綍锛

鍙互浣跨敤澶氫釜-input閫夐」璁剧疆澶氫釜杈撳叆鐩綍锛

 hadoop jar hadoop-streaming.jar -input '/user/foo/dir1' -input '/user/foo/dir2' 

濡備綍鐢熸垚gzip鏍煎紡鐨勮緭鍑烘枃浠讹紵

闄や簡绾枃鏈牸寮忕殑杈撳嚭锛屼綘杩樺彲浠ョ敓鎴恎zip鏂囦欢鏍煎紡鐨勮緭鍑猴紝浣犲彧闇璁剧疆streaming浣滀笟涓殑閫夐」‘-jobconf mapred.output.compress=true -jobconf mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCode’銆

Streaming涓浣曡嚜瀹氫箟input/output format锛

鑷冲皯鍦℉adoop 0.14鐗堟湰浠ュ墠锛屼笉鏀寔澶氫釜jar鏂囦欢銆傛墍浠ュ綋鎸囧畾鑷畾涔夌殑绫绘椂锛屼綘瑕佹妸浠栦滑鍜屽師鏈夌殑streaming jar鎵撳寘鍦ㄤ竴璧凤紝骞剁敤杩欎釜鑷畾涔夌殑jar鍖呮浛鎹㈤粯璁ょ殑hadoop streaming jar鍖呫

Streaming濡備綍瑙f瀽XML鏂囨。锛

浣犲彲浠ヤ娇鐢⊿treamXmlRecordReader鏉ヨВ鏋怷ML鏂囨。銆

hadoop jar hadoop-streaming.jar -inputreader "StreamXmlRecord,begin=BEGIN_STRING,end=END_STRING" ..... (rest of the command)

Map浠诲姟浼氭妸BEGIN_STRING鍜孍ND_STRING涔嬮棿鐨勯儴鍒嗙湅浣滀竴鏉¤褰曘

鍦╯treaming搴旂敤绋嬪簭涓浣曟洿鏂拌鏁板櫒锛

streaming杩涚▼鑳藉浣跨敤stderr鍙戝嚭璁℃暟鍣ㄤ俊鎭 reporter:counter:<group>,<counter>,<amount> 搴旇琚彂閫佸埌stderr鏉ユ洿鏂拌鏁板櫒銆

濡備綍鏇存柊streaming搴旂敤绋嬪簭鐨勭姸鎬侊紵

streaming杩涚▼鑳藉浣跨敤stderr鍙戝嚭鐘舵佷俊鎭 reporter:status:<message> 瑕佽鍙戦佸埌stderr鏉ヨ缃姸鎬併