<img src=”http://oxides.edge-themes.com/wp-content/uploads/2015/11/side_area_logo.png” alt=”Side Area Logo” />

Welcome to unde omnis iste natus error sit voluptatem accusantium

Customizing PredictionIO Similar Product Template to recommend users

Customizing PredictionIO Similar Product Template to recommend users

PredictionIO is an open source Machine Learning Server provide by Apache. It is packed with various templates that can be used to quickly deploy a full scale recommendation engine.

The templates provided by PredictionIO can be easily customized depending upon your requirement.

In this blog we will modify the Similar Product Template, which used to recommend similar items to a user to recommend similar user to a user, i.e we will change user-to-item recommendation to user to user recommendation.

Note : This blog assumes that you have predictionIO event server up and running.


Step 1 :Create a new Engine from the Similar Product Engine Template.



$ git clone https://github.com/apache/predictionio-template-similar-product.git userRecommendation


$ cd userRecommendation


Step 2 :  Open Engine.scala and modify the following lines:

We will change the Query class to get rid of items and put users in place of it. We will remove categories.




case class Query(

  users: List[String],

num: Int,

  whiteList: Option[Set[String]],

  blackList: Option[Set[String]]



Get rid of ItemScore class and create a new class and name it to SimilarUserScore.


case class SimilarUserScore(

user: String,

score: Double




Step 3 :Open DataSource.scala and modify the following line :

We will change viewEvent case class to followEvent and remove the Item case class..




case class FollowEvent(user: String, followedUser: String, t: Long)




Now we will modify TrainingData class to use the newly created  followEvent class created in above.




class TrainingData(

val users: RDD[(String, User)],

val followEvents: RDD[FollowEvent] // MODIFIED

) extends Serializable {

 override def toString = {

    s”users: [${users.count()} (${users.take(2).toList}…)]” +


   s”followEvents: [${followEvents.count()}] (${followEvents.take(2).toList}…)”






The next modification in this file is to modify readTraining() function to read “follow events” and remove RDD of (entityId,Item).




 def readTraining(sc: SparkContext): TrainingData = {


   // create a RDD of (entityID, User)

   val usersRDD: RDD[(String, User)] = …



   // get all “user” “follow” “followedUser” events

   val followEventsRDD: RDD[FollowEvent] = PEventStore.find(

     appName = dsp.appName,

     entityType = Some(“user”),

     eventNames = Some(List(“follow”)),

     // targetEntityType is optional field of an event.

     targetEntityType = Some(Some(“user”)))(sc)

     // eventsDb.find() returns RDD[Event]

     .map { event =>

       val followEvent = try {

         event.event match {

           case “follow” => FollowEvent(

             user = event.entityId,

             followedUser = event.targetEntityId.get,

             t = event.eventTime.getMillis)

           case _ => throw new Exception(s”Unexpected event $event is read.”)


       } catch {

         case e: Exception => {

           logger.error(s”Cannot convert $event to FollowEvent.” +

             s” Exception: $e.”)

           throw e






   new TrainingData(

     users = usersRDD,

     followEvents = followEventsRDD // MODIFIED






Step 4 : Open Preparator.scala and modify the following lines :

This class is responsible for passing on the data to to the algorithm responsible for finding the similar users.

We will modify this class to pass the follow events as prepared data to the algorithm.

Modify the Preparator’s parpare() method:



def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {

    new PreparedData(

     users = trainingData.users,

      followEvents = trainingData.followEvents) // MODIFIED





Now we will modify the PrepareData class :



class PreparedData(

  val users: RDD[(String, User)],

val followEvents: RDD[FollowEvent] // MODIFIED

) extends Serializable




Step 5: Modify ALSAlgorithm.scala :


This class is the core of our recommendation engine. We will modify ALSModel class to use similar user. We need to modify two events namely train() and predict().


Modify train() method to train with the follow event.



def train(sc: SparkContext, data: PreparedData): ALSModel = {


     s”followEvents in PreparedData cannot be empty.” +

     ” Please check if DataSource generates TrainingData” +

     ” and Preprator generates PreparedData correctly.”)


     s”users in PreparedData cannot be empty.” +

     ” Please check if DataSource generates TrainingData” +

     ” and Preprator generates PreparedData correctly.”)

   // create User String ID to integer index BiMap

   val userStringIntMap = BiMap.stringInt(data.users.keys)

   val similarUserStringIntMap = userStringIntMap


   // collect SimilarUser as Map and convert ID to Int index

   val similarUsers: Map[Int, User] = data.users.map { case (id, similarUser) =>

     (similarUserStringIntMap(id), similarUser)



   val mllibRatings = data.followEvents

     .map { r =>

       // Convert user and user String IDs to Int index for MLlib

       val uindex = userStringIntMap.getOrElse(r.user, -1)

       val iindex = similarUserStringIntMap.getOrElse(r.followedUser, -1)


       if (uindex == -1)

         logger.info(s”Couldn’t convert nonexistent user ID ${r.user}”

           + ” to Int index.”)


       if (iindex == -1)

         logger.info(s”Couldn’t convert nonexistent followedUser ID ${r.followedUser}”

           + ” to Int index.”)


       ((uindex, iindex), 1)

     }.filter { case ((u, i), v) =>

       // keep events with valid user and user index

       (u != -1) && (i != -1)


     .map { case ((u, i), v) =>

       // MLlibRating requires integer index for user and user

       MLlibRating(u, i, v)




   // MLLib ALS cannot handle empty training data.


     s”mllibRatings cannot be empty.” +

     ” Please check if your events contain valid user and followedUser ID.”)


   // seed for MLlib ALS

   val seed = ap.seed.getOrElse(System.nanoTime)


   val m = ALS.trainImplicit(

     ratings = mllibRatings,

     rank = ap.rank,

     iterations = ap.numIterations,

     lambda = ap.lambda,

     blocks = -1,

     alpha = 1.0,

     seed = seed)


   new ALSModel(

     similarUserFeatures = m.productFeatures.collectAsMap().toMap,

     similarUserStringIntMap = similarUserStringIntMap,

     similarUsers = similarUsers






Modify predict() method to predict similar users



 def predict(model: ALSModel, query: Query): PredictedResult = {


   val similarUserFeatures = model.similarUserFeatures


   // convert similarUsers to Int index

   val queryList: Set[Int] = query.users.map(model.similarUserStringIntMap.get)



   val queryFeatures: Vector[Array[Double]] = queryList.toVector

     // similarUserFeatures may not contain the requested user

     .map { similarUser => similarUserFeatures.get(similarUser) }



   val whiteList: Option[Set[Int]] = query.whiteList.map( set =>



   val blackList: Option[Set[Int]] = query.blackList.map ( set =>




   val ord = Ordering.by[(Int, Double), Double](_._2).reverse


   val indexScores: Array[(Int, Double)] = if (queryFeatures.isEmpty) {

     logger.info(s”No similarUserFeatures vector for query users ${query.users}.”)

     Array[(Int, Double)]()

   } else {

     similarUserFeatures.par // convert to parallel collection

       .mapValues { f =>

         queryFeatures.map { qf =>

           cosine(qf, f)



       .filter(_._2 > 0) // keep similarUsers with score > 0

       .seq // convert back to sequential collection




   val filteredScore = indexScores.view.filter { case (i, v) =>


       i = i,

       similarUsers = model.similarUsers,

       queryList = queryList,

       whiteList = whiteList,

       blackList = blackList




   val topScores = getTopN(filteredScore, query.num)(ord).toArray


   val similarUserScores = topScores.map { case (i, s) =>


       user = model.similarUserIntStringMap(i),

       score = s









Step 6 : Test results :


  1. Build the source code by running :
    $pio build
  2. Train the engine by running
    $pio train
  3. Deploy the engine by running
    $pio deploy


Now we can query the engine by sending a GET/POST request :



$ curl -H “Content-Type: application/json” \

-d ‘{ “users”: [“u1”], “num”: 4 }’ \





Sample output :














So with these simple customizations we are able to change the nature of similar product template to predict users in place of items.