构造条件FP-tree:word格式-可编辑-感谢下载支持
得到I5频繁项集:{{I2,I5:2},{I1,I5:2},{I2,I1,I5:2}} I4、I1的挖掘与I5类似,条件FP-树都是单路径。 (1)挖掘I3:
I5的情况是比较简单的,因为I5对应的条件FP-树是单路径的,I3稍微复杂一点。I3的条件模式基是(I2 I1:2), (I2:2), (I1:2),生成的条件FP-树如下图:
I3的条件FP-树仍然是一个多路径树,首先把模式后缀I3和条件FP-树中的项头表中的每一项取并集,得到一组模式{I2 I3:4, I1 I3:4},但是这一组模式不是后缀为I3的所有模式。还需要递归调用FP-growth,模式后缀为{I1,I3},{I1,I3}的条件模式基为{I2:2},其生成的条件FP-树如下图所示。
在FP_growth中把I2和模式后缀{I1,I3}取并得到模式{I1 I2 I3:2}。
理论上还应该计算一下模式后缀为{I2,I3}的模式集,但是{I2,I3}的条件模式基为空,递归调用结束。最终模式后缀I3的支持度>2的所有模式为:{ I2 I3:4, I1 I3:4, I1 I2 I3:2}。 1.2 Spark Mllib FPGrowth源码分析
FPGrowth源码包括:FPGrowth、FPTree两部分。 其中FPGrowth中包括:run方法、genFreqItems方法、genFreqItemsets方法、genCondTransactions方法; FPTree中包括:add方法、merge方法、project方法、getTransactions方法、extract方法。
// run 计算频繁项集 /**
* Computes an FP-Growth model that contains frequent itemsets. * @param data input data set, each element contains a transaction * @return an [[FPGrowthModel]] */
word格式-可编辑-感谢下载支持
def run[Item: ClassTag](data: RDD[Array[Item]]): FPGrowthModel[Item] = { if (data.getStorageLevel == StorageLevel.NONE) { logWarning(\"Input data is not cached.\") }
val count = data.count()//计算事务总数
val minCount = math.ceil(minSupport * count).toLong//计算最小支持度 val numParts = if (numPartitions > 0) numPartitions else val partitioner = new HashPartitioner(numParts) //freqItems计算满足最小支持度的Items项
val freqItems = genFreqItems(data, minCount, partitioner) //freqItemsets计算频繁项集
val freqItemsets = genFreqItemsets(data, minCount, freqItems, partitioner) new FPGrowthModel(freqItemsets) }
// genFreqItems计算满足最小支持度的Items项 /**
* Generates frequent items by filtering the input data using minimal support level. * @param minCount minimum count for frequent itemsets * @param partitioner partitioner used to distribute items * @return array of frequent pattern ordered by their frequencies */
privatedef genFreqItems[Item: ClassTag]( data: RDD[Array[Item]], minCount: Long,
partitioner: Partitioner): Array[Item] = { data.flatMap { t => val uniq = t.toSet if (t.size != uniq.size) {
thrownew SparkException(s\"Items in a transaction must be unique but got ${t.toSeq}.\") } t
}.map(v => (v, 1L))
.reduceByKey(partitioner, _ + _) .filter(_._2 >= minCount) .collect() .sortBy(-_._2) .map(_._1)
}//统计每个Items项的频次,对小于minCount的Items项过滤,返回Items项。
// genFreqItemsets计算频繁项集:生成FP-Trees,挖掘FP-Trees
word格式-可编辑-感谢下载支持
/**
* Generate frequent itemsets by building FP-Trees, the extraction is done on each partition. * @param data transactions
* @param minCount minimum count for frequent itemsets * @param freqItems frequent items
* @param partitioner partitioner used to distribute transactions * @return an RDD of (frequent itemset, count) */
privatedef genFreqItemsets[Item: ClassTag]( data: RDD[Array[Item]], minCount: Long, freqItems: Array[Item],
partitioner: Partitioner): RDD[FreqItemset[Item]] = { val itemToRank = //表头 data.flatMap { transaction =>
genCondTransactions(transaction, itemToRank, partitioner)
}.aggregateByKey(new FPTree[Int], partitioner.numPartitions)( //生成FP树 (tree, transaction) => tree.add(transaction, 1L), //FP树增加一条事务 (tree1, tree2) => tree1.merge(tree2)) //FP树合并 .flatMap { case (part, tree) =>
tree.extract(minCount, x => partitioner.getPartition(x) == part)//FP树挖掘频繁项 }.map { case (ranks, count) =>
new FreqItemset(ranks.map(i => freqItems(i)).toArray, count) } }
// add FP-Trees增加一条事务数据 /** Adds a transaction with count. */
def add(t: Iterable[T], count: Long = 1L): this.type = { require(count > 0) var curr = root curr.count += count t.foreach { item =>
val summary = summaries.getOrElseUpdate(item, new Summary) summary.count += count val child = (item, {
val newNode = new Node(curr) newNode.item = item summary.nodes += newNode newNode })
word格式-可编辑-感谢下载支持
child.count += count curr = child } this }
// merge FP-Trees合并
/** Merges another FP-Tree. */ def merge(other: FPTree[T]): this.type = { { case (t, c) => add(t, c) } this }
// extract FP-Trees挖掘,返回所有频繁项集
/** Extracts all patterns with valid suffix and minimum count. */ def extract(
minCount: Long,
validateSuffix: T => Boolean = _ => true): Iterator[(List[T], Long)] = { { case (item, summary) =>
if (validateSuffix(item) && summary.count >= minCount) { Iterator.single((item :: Nil, summary.count)) ++ project(item).extract(minCount).map { case (t, c) => (item :: t, c) } } else {
Iterator.empty } } } }
1.3 Mllib FPGrowth实例
1、数据
数据格式为:物品1物品2物品3… r z h k p
z y x w v u t s s x o n r
x z y m t s q e z
x z y r q t p
2、代码(shell执行的代码) //读取样本数据
word格式-可编辑-感谢下载支持
val data_path = \"/home/tmp/sample_fpgrowth.txt\" val data = sc.textFile(data_path)
val examples = data.map(_.split(\" \")).cache() //建立模型 val minSupport = 2 val numPartition = 10 val model = new FPGrowth() .setMinSupport(minSupport) .setNumPartitions(numPartition) .run(examples) //打印结果
println(s\"Number of frequent itemsets: ${()}\") ().foreach { itemset =>
println((\"[\ }