- 浏览: 78068 次
- 性别:
- 来自: 南京
最新评论
-
amy929:
你好!我最近在做学mapreduce,可否发一份这个代码给我学 ...
MapReduce框架中矩阵相乘的算法思路及其实现 -
微笑春天:
楼主 你好 花了一晚上的时间看了下你这个算法的实现 说实话 我 ...
MapReduce框架中矩阵相乘的算法思路及其实现 -
gaycolour:
大大,同求完整代码!634677370@qq.com
MapReduce框架中矩阵相乘的算法思路及其实现 -
zarchary-10:
你好,同求完整代码,可否发份zzy07053437@163.c ...
MapReduce框架中矩阵相乘的算法思路及其实现 -
developerinit:
你好,最近也在研究mapreduce矩阵乘法,想看下你这个例子 ...
MapReduce框架中矩阵相乘的算法思路及其实现
主要实现思想在另一篇博客中已经提到:
具体实现每次迭代包括两个Job
第一个分散各个节点的PR值
第二个用于将dangling节点的PR值分散到其它节点
主要包括5个类
PageRankNode:图中的节点类-代表一个页面
PageRankJob:实现分散各个节点的PR值的类
DistributionPRMass:实现dangling节点的PR值分散到其它节点的Job类
RangePartitioner:partition类 将连续的节点分配到同一个reduce中
PageRankDirver:整个工作的驱动类(主函数)
package com.zxx.PageRank; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; public class PageRankJob { public static final double d = 0.85; private static final double nodecount = 10; private static final double threshold=0.01;//收敛邻接点 public static enum MidNodes { // 记录已经收敛的个数 Map, Reduce }; public static class PageRankMaper extends Mapper<Object, Text, Text, Text> { @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { PageRankNode node = PageRankNode.InstanceFormString(value.toString()); node.setOldPR(node.getNewPR()); context.write(new Text(node.getId()), new Text(PageRankNode.toStringWithOutID(node))); for (String str : node.getDestNodes()) { String outPR = new Double(node.getNewPR() / (double)node.getNumDest()).toString(); context.write(new Text(str), new Text(outPR)); } } } public static class PageRankJobReducer extends Reducer<Text, Text, Text, Text> { private double totalMass = Double.NEGATIVE_INFINITY; // 缓存每个key从其它点得到的全部PR值 private double missMass=Double.NEGATIVE_INFINITY; @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { PageRankNode currentNode = new PageRankNode(key.toString()); double inPR = 0.0; for (Text val : values) { String[] temp = val.toString().trim().split("\\s+"); if (temp.length == 1) // 此时候只输出一个PR值 { inPR += Double.valueOf(temp[0]); } else if (temp.length >= 4) {// 此时输出的是含有邻接点的节点全信息 currentNode = PageRankNode.InstanceFormString(key.toString() + "\t" + val.toString()); } else if (temp.length == 3) { // 此时输出的点没有出度 context.getCounter("PageRankJobReducer", "errornode").increment(1); currentNode=PageRankNode.InstanceFormString(key.toString() + "\t" + val.toString()); } } if (currentNode.getNumDest()>=1) { double newPRofD = (1 - PageRankJob.d) /(double) PageRankJob.nodecount + PageRankJob.d * inPR; currentNode.setNewPR(newPRofD); context.write(new Text(currentNode.getId()), new Text(PageRankNode.toStringWithOutID(currentNode))); }else if (currentNode.getNumDest()==0) { missMass=currentNode.getOldPR();//得到dangling节点的上一次的PR值,传播到下一个分布Pr的job } totalMass += inPR; double partPR=(currentNode.getNewPR()-currentNode.getOldPR())*(currentNode.getNewPR()-currentNode.getOldPR()); if (partPR<=threshold) { context.getCounter(MidNodes.Reduce).increment(1); } } @Override public void cleanup(Context context) throws IOException, InterruptedException { // 将total记录到文件中 Configuration conf = context.getConfiguration(); String taskId = conf.get("mapred.task.id"); String path = conf.get("PageRankMassPath");// 注意此处的path路径设置------------------ if (missMass==Double.NEGATIVE_INFINITY) { return; } FileSystem fs = FileSystem.get(context.getConfiguration()); FSDataOutputStream out = fs.create(new Path(path + "/"+"missMass"), false); out.writeDouble(missMass); out.close(); } } }
package com.zxx.PageRank; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper.Context; import com.zxx.Graph.ArrayListOfInts; import com.zxx.Graph.BFSNode; import com.zxx.Graph.HMapII; import com.zxx.Graph.MapII; import com.zxx.Graph.ReachableNodes; public class DistributionPRMass { public class GraphMapper extends Mapper<Object, Text, Text, Text> { private double missingMass = 0.0; private int nodeCnt = 0; @Override public void setup(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); missingMass = (double)conf.getFloat("MissingMass", 0.0f);//该值等于1-totalMass nodeCnt = conf.getInt("NodeCount", 0); } @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { PageRankNode currentNode=PageRankNode.InstanceFormString(value.toString().trim()); currentNode.setOldPR(currentNode.getNewPR()); double p=currentNode.getNewPR(); double pnew=(1-PageRankJob.d)/(double)(nodeCnt-1)+PageRankJob.d*missingMass/(double)(nodeCnt-1); //double pnew=missingMass/(double)(nodeCnt-1); currentNode.setNewPR(p+pnew); context.write(new Text(currentNode.getId()), new Text(PageRankNode.toStringWithOutID(currentNode))); } @Override public void cleanup(Context context) throws IOException, InterruptedException { } } }
package com.zxx.PageRank; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Partitioner; public class RangePartitioner extends Partitioner<Text, Text> implements Configurable { private int nodeCnt = 0; private Configuration conf; public RangePartitioner() {} @Override public Configuration getConf() { return conf; } @Override public void setConf(Configuration arg0) { this.conf = arg0; configure(); } @Override public int getPartition(Text arg0, Text arg1, int arg2) { return (int) ((float)(Integer.parseInt(arg0.toString()) / (float) nodeCnt) * arg2) % arg2; } private void configure() //获得节点的总数 { nodeCnt = conf.getInt("NodeCount", 0); } }
package com.zxx.PageRank; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class PageRankDirver { public static final int numNodes=5; //节点数 public static final int maxiter=10; //最大收敛次数 public static void main(String[] args) throws Exception { long count=0; //缓存已经接近收敛的节点个数 int it=1; int num=1; String input="/Graph/input/"; String output="/Graph/output1"; do{ Job job=getPageRankJob(input, output); job.waitForCompletion(true); Counters counter = job.getCounters(); count = counter.findCounter(PageRankJob.MidNodes.Reduce).getValue(); input="/Graph/output"+it; it++; output="/Graph/output"+it; Job job1=getDistrbuteJob(input,output); job1.waitForCompletion(true); input="/Graph/output"+it; it++; output="/Graph/output"+it; if(num<maxiter) System.out.println("it:"+it+" "+count); num++; }while(count!=numNodes); } public static Job getPageRankJob(String inPath,String outPath) throws Exception { Configuration conf = new Configuration(); Job job=new Job(conf,"PageRank job"); job.getConfiguration().setInt("NodeCount", numNodes); job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false); job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", false); job.getConfiguration().set("PageRankMassPath", "/mass"); job.setJarByClass(PageRankDirver.class); job.setNumReduceTasks(5); job.setMapperClass(PageRankJob.PageRankMaper.class); job.setReducerClass(PageRankJob.PageRankJobReducer.class); job.setPartitionerClass(RangePartitioner.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(inPath)); FileOutputFormat.setOutputPath(job, new Path(outPath)); FileSystem.get(job.getConfiguration()).delete(new Path(outPath), true);//如果文件已存在删除 return job; } public static Job getDistrbuteJob(String inPath,String outPath) throws Exception { Configuration conf = new Configuration(); Job job=new Job(conf,"Ditribute job"); double mass = Double.NEGATIVE_INFINITY; //一下是读取dangling节点的PR值,将其分配到其他节点 FileSystem fs = FileSystem.get(conf); for (FileStatus f : fs.listStatus(new Path("/mass/missMass"))) { FSDataInputStream fin = fs.open(f.getPath()); mass = fin.readDouble(); fin.close(); } job.getConfiguration().setFloat("MissingMass",(float)mass); job.getConfiguration().setInt("NodeCount", numNodes); job.getConfiguration().setInt("NodeCount", numNodes); job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false); job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", false); job.getConfiguration().set("PageRankMassPath", "/mass"); job.setJarByClass(PageRankDirver.class); job.setNumReduceTasks(5); job.setMapperClass(PageRankJob.PageRankMaper.class); job.setReducerClass(PageRankJob.PageRankJobReducer.class); job.setPartitionerClass(RangePartitioner.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(inPath)); FileOutputFormat.setOutputPath(job, new Path(outPath)); FileSystem.get(job.getConfiguration()).delete(new Path(outPath), true);//如果文件已存在删除 return job; } }
评论
6 楼
zhaohuiweixiao
2013-05-20
博主,输入文件的格式是什么样的呀?能举例说明一下吗?急用。谢谢!
5 楼
bleath2046
2013-04-15
博主你好,最近在学hadoop你上面程序的输入文件应该是设么样的,希望方便时告诉我一下,我邮箱是381568990@qq.com 谢谢!
4 楼
schaha
2012-07-02
前段时间忙,现在又开始看pagerank了。
楼主,程序我没看明白,在Hadoop环境中运行有错,求指导!
楼主,程序我没看明白,在Hadoop环境中运行有错,求指导!
3 楼
schaha123
2012-05-18
谢谢啊,这几天仔细研究一下。
2 楼
zxxapple
2012-05-18
实现很简单的
我在这里给你贴出来吧
我在这里给你贴出来吧
package com.zxx.PageRank; import java.util.*; import javax.naming.spi.DirStateFactory.Result; import javax.xml.soap.Node; public class PageRankNode { private String id; private List<String> destNodes=new ArrayList<String>(); private double oldPR; private double newPR; private int numDest; public PageRankNode() { } public PageRankNode(String id) { this.id=id; } public static String toStringWithOutID(PageRankNode node) { StringBuffer temp=new StringBuffer(); temp.append(node.getOldPR()); temp.append("\t"+node.getNewPR()); temp.append("\t"+node.getNumDest()); for(String dest:node.getDestNodes()) { temp.append("\t"+dest); } return temp.toString(); } public static PageRankNode InstanceFormString(String nodeStr) { PageRankNode node=new PageRankNode(); String[] res=nodeStr.split("\\s+"); node.setId(res[0]); if (res.length==2) { node.setNewPR(Double.valueOf(res[1])); }else if (res.length>4) { node.setOldPR(Double.valueOf(res[1])); node.setNewPR(Double.valueOf(res[2])); node.setNumDest(Integer.valueOf(res[3])); for (int i = 4; i < res.length; i++) { node.getDestNodes().add(res[i]); } assert(node.getNumDest()==node.getDestNodes().size()); } return node; } public String getId() { return id; } public void setId(String id) { this.id = id; } public List<String> getDestNodes() { return destNodes; } public void setDestNodes(List<String> destNodes) { this.destNodes = destNodes; } public double getOldPR() { return oldPR; } public void setOldPR(double oldPR) { this.oldPR = oldPR; } public double getNewPR() { return newPR; } public void setNewPR(double newPR) { this.newPR = newPR; } public int getNumDest() { return numDest; } public void setNumDest(int numDest) { this.numDest = numDest; } }
1 楼
schaha123
2012-05-18
您好,楼主,我现在正在做Nutch中实现PageRank算法,想看看你在MapReduce编程模式上是怎么实现的,好像少写一个类啊--PageRankNode类。
发表评论
-
MapReduce框架中全排序的算法思想--学习笔记
2012-02-14 14:13 7435关于全排序的问题 ... -
Mapreduce框架求Pi值的思路
2012-02-14 11:09 1429关于在mapreduce框架上求近似Pi的值,hadoop源码 ... -
MapReduce框架中矩阵相乘的算法思路及其实现
2012-02-14 10:36 8678关于在mapreduce框架中的两个矩阵相乘(A*B)的算法实 ... -
Mapreduce 编程测试
2012-02-13 22:32 1109最近对mapreduce框架比较感兴趣,所以会去测试许多例子, ... -
现阶段MapReduce框架 实现简单图的算法
2012-02-13 22:17 3210刚开始接触hadoop的mapreduce并行计算的编程框架, ...
相关推荐
实验内容 1. 采用基于“抽税”法在MapReduce框架下,分析图1的网页PageRank排名; 2. 图1中,若节点②和节点⑤是主题节点,采用面向主题的PageRank算法重新计算所有节点的PageRank值。
迷你谷歌由搜寻器,索引器,PageRank,MapReduce框架,搜索算法和前端组成。 搜寻器,索引器和pagerank作为mapreduce作业运行。 该系统利用了一个主节点和八个工作/数据节点,这些节点通过REST调用进行通信。
2.图1中,若节点②和节点⑤是主题节点,采用面向主题的PageRank算法重新计算所有节点的PageRank值。二、实验设计(原理分析及流程)三、实验代码及数据记录1.代码1.0 文件结构图1.1 ENode.javapackage ...
最后通过PageRank和SSSP算法设计实验,与MapReduce框架和采用HDFS作持久层的Spark框架进行性能对比。实验证明提出的框架要比MapReduce框架快90倍,比采用HDFS作持久层的Spark框架快2倍,能够满足高效率图数据处理的...
MapReduce 是一个使用简单的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错式并行处理TB级别的数据集。 来源 来源于pageRank也就是我们网页排名的问题;就像我们...
社交媒体网络在人们的日常生活中发挥着越来越重要的作用。... 在现实世界中的社交媒体网络和人工网络上的经验结果表明,新框架在准确性,速度和可伸缩性方面优于我们以前的工作和一些著名的算法,例如Radetal,FastGN。
并行化此类算法的现有技术使用诸如 MapReduce 和 Hadoop 之类的软件框架在集群中的多个基于 CPU 的工作站之间分发数据以进行迭代并收集每次迭代的结果。 这些平台的特点是需要在迭代边界同步数据计算,从而阻碍系统...
本篇英文论文通过三个具体实例(WordCount Sorted By Key, WordCount Sorted by Values 和 PageRank算法)来对比Hadoop 和 Spark 在大数据应用中运行时间,从而观察这些研究实例随着的迭代计算次数的增加,其时间...
针对现有的图处理和图管理框架存在的效率...最后通过PageRank和SSSP算法设计实验,与MapReduce框架和采用HDFS作持久层的Spark框架进行性能对比。实验证明提出的框架要比MapReduce框架快90倍,比采用HDFS作持久层的Spa
提出了一款基于Hadoop的并行数据分析系统―――PDM.该系统拥有大量以MapReduce...介绍了基于电信数据的典型应用,如采用并行k均值和决策树算法实现的“套餐推荐”,利用并行PageRank算法实现的“营销关键点发现”等;最后
前言 致谢 关于本书 第1 部分 背景和基本原理 1 跳跃中的Hadoop 1.1 什么是Hadoop 1.1.1 Hadoop 的核心组件 1.1.2 Hadoop 生态圈 1.1.3 物理架构 ...附录D 优化MapReduce 合并框架 索引 收起全部↑
数据科学.7 数据结构和算法的运用7.1 使用图进行数据建模和解决问题7.1.1 模拟图7.1.2 最短路径算法技术点52 找出两个用户间的最短距离7.1.3 friends-of-friends(FoF) 技术点53 计算FoF 7.1.4 ...
• PageRank 和田rs 算法是什么关系?有何异同? SALSA 算法是什么? Hilltop 算法又 是什么?各种链接分析算法之间是什么关系? • 如何识别搜索用户的真实搜索意图?用户搜索目的可以分为几类?什么是点击图? ...