大数据架构师,用HadoopMapReduce编程:计算最大值,你能学会吗
其实,使用MapReduce计算最大值的问题,和Hadoop自带的WordCount的程序没什么区别,不过在Reducer中一个是求最大值,一个是做累加,本质一样,比较简单。下面我们结合一个例子来实现。

测试数据
我们通过自己的模拟程序,生成了一组简单的测试样本数据。输入数据的格式,截取一个片段,如下所示:
SG2536525365619850464KG2536525365743485698UZ2536525365570409379TT253652536523236775BE2536525365597874033BO2536525365498265375SR2536525365484613339SV2536525365629640166LV2536525365870680704FJ2536525365517965666
上面文本数据一行一行存储,一行包含4部分,分别表示:
国家代码
起始时间
截止时间
随机成本/权重估值
各个字段之间以空格号分隔。我们要计算的结果是,求各个国家(以国家代码标识)的成本估值的最大值。
编程实现
因为比较简单,直接看实际的代码。代码分为三个部分,当然是Mapper、Reducer、Driver。Mapper实现类为GlobalCostMapper,实现代码如下所示:
;;;;;publicclassGlobalCostMapperextsMapperLongWritable,Text,Text,LongWritable{privatefinalstaticLongWritablecostValue=newLongWritable(0);privateTextcode=newText();@Overrideprotectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{//aline,suchas'SG2536525365619850464'Stringline=();String[]array=("\\s");if(==4){StringcountryCode=array[0];StringstrCost=array[3];longcost=0L;try{cost=(strCost);}catch(NumberFormatExceptione){cost=0L;}if(cost!=0){(countryCode);(cost);(code,costValue);}}}}上面实现逻辑非常简单,就是根据空格分隔符,将各个字段的值分离出来,最后输出键值对。接着,Mapper输出了的键值对列表,在Reducer中就需要进行合并化简,Reducer的实现类为GlobalCostReducer,实现代码如下所示:
;;;;;;publicclassGlobalCostReducerextsReducerText,LongWritable,Text,LongWritable{@Overrideprotectedvoidreduce(Textkey,IterableLongWritablevalues,Contextcontext)throwsIOException,InterruptedException{longmax=0L;IteratorLongWritableiter=();while(()){LongWritablecurrent=();if(()max){max=();}}(key,newLongWritable(max));}}上面计算一组键值对列表中代价估值的最大值,逻辑比较简单。为了优化,在Map输出以后,可以使用该Reducer进行合并操作,即作为Combiner,减少从Mapper到Reducer的数据传输量,在配置Job的时候可以指定。下面看,如何来配置和运行一个Job,实现类为GlobalMaxCostDriver,实现代码如下所示:
;;;;;;;;;;publicclassGlobalMaxCostDriver{publicstaticvoidmain(String[]args)throwsIOException,InterruptedException,ClassNotFoundException{Configurationconf=newConfiguration();String[]otherArgs=newGenericOptionsParser(conf,args).getRemainingArgs();if(!=2){("Usage:maxcostinout");(2);}Jobjob=newJob(conf,"maxcost");();();();();();();(job,newPath(otherArgs[0]));(job,newPath(otherArgs[1]));intexitFlag=(true)?0:1;(exitFlag);}}
运行程序
首先,需要保证Hadoop集群正常运行,我这里NameNode是主机ubuntu3。下面看运行程序的过程:
编译代码(我直接使用Maven进行),打成jar文件
拷贝上面生成的jar文件,到NameNode环境中
上传待处理的数据文件
运行我们编写MapReduce任务,计算最大值
运行过程控制台输出内容,大概如下所示:
13/03/2216:30:16:Totalinputpathstoprocess:113/03/2216:30:16:Loadedthenative-hadooplibrary13/03/2216:30:16:Snappynativelibrarynotloaded13/03/2216:30:16:Runningjob:job_201303111631_000413/03/2216:30:17:map0%reduce0%13/03/2216:30:33:map22%reduce0%13/03/2216:30:36:map28%reduce0%13/03/2216:30:45:map52%reduce9%13/03/2216:30:48:map57%reduce9%13/03/2216:30:57:map80%reduce9%13/03/2216:31:00:map85%reduce19%13/03/2216:31:10:map100%reduce28%13/03/2216:31:19:map100%reduce100%13/03/2216:31:24:Jobcomplete:job_201303111631_000413/03/2216:31:24:Counters:2913/03/2216:31:24:JobCounters13/03/2216:31:24:Launchedreducetasks=113/03/2216:31:24:SLOTS_MILLIS_MAPS=7677313/03/2216:31:24:Totaltimespentbyallreduceswaitingafterreservingslots(ms)=013/03/2216:31:24:Totaltimespentbyallmapswaitingafterreservingslots(ms)=013/03/2216:31:24:Launchedmaptasks=713/03/2216:31:24:Data-localmaptasks=713/03/2216:31:24:SLOTS_MILLIS_REDUCES=4049713/03/2216:31:24:FileOutputFormatCounters13/03/2216:31:24:BytesWritten=302913/03/2216:31:24:FileSystemCounters13/03/2216:31:24:FILE_BYTES_READ=14260913/03/2216:31:24:HDFS_BYTES_READ=44891365313/03/2216:31:24:FILE_BYTES_WRITTEN=33815113/03/2216:31:24:HDFS_BYTES_WRITTEN=302913/03/2216:31:24:FileInputFormatCounters13/03/2216:31:24:BytesRead=44891279913/03/2216:31:24:Map-ReduceFramework13/03/2216:31:24:Mapoutputmaterializedbytes=2124513/03/2216:31:24:Mapinputrecords=1000000013/03/2216:31:24:Reduceshufflebytes=1821013/03/2216:31:24:SpilledRecords=1258213/03/2216:31:24:Mapoutputbytes=11000000013/03/2216:31:24:CPUtimespent(ms)=8032013/03/2216:31:24:Totalcommittedheapusage(bytes)=3/03/2216:31:24:Combineinputrecords=1000932013/03/2216:31:24:SPLIT_RAW_BYTES=85413/03/2216:31:24:Reduceinputrecords=163113/03/2216:31:24:Reduceinputgroups=23313/03/2216:31:24:Combineoutputrecords=1095113/03/2216:31:24:Physicalmemory(bytes)snapshot=3/03/2216:31:24:Reduceoutputrecords=23313/03/2216:31:24:Virtualmemory(bytes)snapshot=431687270413/03/2216:31:24:Mapoutputrecords=10000000
验证Job结果输出
xiaoxiang@ubuntu3:/opt/stone/cloud/$bin/hadoopfs-cat/user/xiaoxiang/output/cost/part-r-00000AD999974516AE999938630AF999996180AG999991085AI999989595AL999998489AM999976746AO999989628AQ999995031AR999953989AS999935982AT999999909AU999937089AW999965784AZ999996557BA999949773BB999987345BD999992272BE999925057BF999999220BG999971528BH999994900BI999978516BJ999977886BM999991925BN999986630BO999995482BR999989947BS999980931BT999977488BW999935985BY999998496BZ999975972CA999978275CC999968311CD999978139CF999995342CG999788112CH999997524CI999998864CK999968719CL999967083CM999998369CN999975367CO999999167CR999971685CU999976352CV999990543CW999987713CX999987579CY999982925CZ999993908DE999985416DJ999997438DK999963312DM999941706DO999945597DZ999973610EC999920447EE999949534EG999980522ER999980425ES999949155ET999987033FI999966243FJ999990686FK999966573FM999972146FO999988472FR999988342GA999982099GB999970658GD999996318GE999991970GF999982024GH999941039GI999995295GL999948726GM999967823GN999951804GP999904645GQ999988635GR999999672GT999972984GU999919056GW999962551GY999999881HK999970084HN999972628HR999986688HT999970913HU999997568ID999994762IE999996686IL999982184IM999987831IN999914991IO999968575IQ999990126IR999986780IS999973585IT999997239JM999982209JO999977276JP999983684KE999996012KG999991556KH999975644KI999994328KM999989895KN999991068KP999967939KR999992162KW999924295KY999977105KZ999992835LA999989151LB999963014LC999962233LI999986863LK999989876LR999897202LS999957706LT999999688LU999999823LV999945411LY999992365MA999922726MC999978886MD999996042MG999996602MH999989668MK999968900ML999990079MM999987977MN999969051MO999977975MP999995234MQ999913110MR999982303MS999974690MT999982604MU999988632MV999961206MW999991903MX999978066MY999995010MZ999981189NA999961177NC999961053NE999990091NF999989399NG999985037NI999965733NL999949789NO999993122NP999972410NR999956464NU999987046NZ999998214OM999967428PA999924435PE999981176PF999959978PG999987347PH999981534PK999954268PL999996619PM999998975PR999906386PT999993404PW999991278PY999985509QA999995061RE999952291RO999994148RS999999923RU999894985RW999980184SA999973822SB999972832SC999973271SD999963744SE999972256SG999977637SH999983638SI999980580SK999998152SL999999269SM999941188SN999990278SO999973175SR999975964ST999980447SV999999945SX999903445SY999988858SZ999992537TC999969540TD999999303TG999977640TH999968746TJ999983666TK999971131TM999958998TN999963035TO999947915TP999986796TR999995112TT999984435TV999971989TW999975092TZ999992734UA999970993UG999976267UM999998377US999912229UY999989662UZ999982762VA999975548VC999991495VE999997971VG999949690VI999990063VN999974393VU999953162WF999947666WS999970242YE999984650YT999994707ZA999998692ZM999973392ZW999928087
可见,结果是我们所期望的。(原创:时延军(包含链接:))

推荐阅读
-
高手写的CAN总线入门总结
1.简介CAN总线由德国BOSCH公司开发,最高速率可达到1Mbps。CAN的容错能力特别强,CAN控制器内建了强大的检错和处理机制。另外不同于传统的网络(比如USB或者以太网),CAN节点与节点之间不会传输大数据块,一帧CAN消息最多传输8字节用户数据,采用短数据包也可以使得系统获得更好的稳定性。...
-
ST乐蜀拟以1000万出售上海灵擎网络科技、武汉乐蜀网络科技石家庄慧闻网络科技100%的股权
挖贝网8月16日,ST乐蜀(836002)近日发布公告,根据公司战略发展需要,公司拟以人民币10,000,000.00元出售持有的上海灵擎网络科技有限公司100%的股权、武汉乐蜀网络科技有限公司100%的股权、石家庄慧闻网络科技有限公司100%的股权。公司2022年度经审计的合并财务报表期末总资产为...
-
汇实科技拟募资1005万元 用于研发升级和商务拓展
读懂新三板报道3月1日,汇实科技发布公告称,公司拟以13.40元/股的价格发行75万股,募集资金1005万元。公告显示,本次发行对象为张泳贤。其中,张泳贤出资1005万元认购了75万股。本次募集资金将用于为游戏内物品交易平台的研发升级和支付业务前后端开发及商务拓展提供所需资金。公司表示,本次募集资金...
-
信号到底是怎么处理的?
模拟信号和数字信号有什么区别呢?我们又是如何对信号进行处理的呢?今天我们就来简单介绍一下关于信号处理的一些基本问题。首先我们来说一下模拟信号和数字信号的区别。模拟信号就指的是它可以连续变化的信号,可以连续变化的信号;而数字信号不是连续变化的,而是一个一个分立的。在计算机里面我们是没有办法处理十进制数...