Skip to content
This repository was archived by the owner on Jan 14, 2020. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Created by .ignore support plugin (hsz.mobi)
### SBT template
# Simple Build Tool
# http://www.scala-sbt.org/release/docs/Getting-Started/Directories.html#configuring-version-control

.idea/
dist/*
*/target/
**/target/
target/
lib_managed/
src_managed/
project/boot/
project/plugins/project/
.history
.cache
.lib/

28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
Completar el scrapper, topicos y casos.

1. Crear endponints y sus tests.
- [X] /media
- [x] test autenticación.
- [X] /following
- [x] test autenticación.
- [X] /followers
- [x] test autenticación.
2. Completar el scrapper del tópico `topic.userMedia`.
- [X] Schema.
- [X] Procesar el mediaType 1. #10
- [X] Basic integration test to kafka #11
- [ ] Procesar mediaType 8.
3. Completar el scrapper del tópico `topic.appender`.
- [ ] Schema.
- [ ] Procesar MediaRequest.
- [ ] Procesar UserRequest.
4. Completar el scrapper del tópico `topic.user`.
- [ ] Schema.
- [ ] Procesar User.
5. Completar el scrapper del tópico `topic.userFollowing` y `user.userFollowers`.
- [ ] Schema.
- [ ] pipeline to user .
7. DB persistence.
- [ ] Media
- [ ] Followers
- [ ] Following
8 changes: 4 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ lazy val `ig-http-api` = (project in file("ig-http-api"))
addCompilerPlugin("org.scalamacros" % "paradise" % "2.1.0" cross CrossVersion.full)
lazy val `ig-crawler` = (project in file("ig-crawler"))
.settings(
libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % "2.3.0",
libraryDependencies += "com.github.pathikrit" %% "better-files" % "3.8.0",
libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % "2.3.0",
libraryDependencies += "org.apache.kafka" % "kafka-streams-test-utils" % "2.3.0" % Test,
libraryDependencies ++= (http4sLibraries ++ circeLibraries),
addCompilerPlugin("org.scalamacros" % "paradise" % "2.1.0" cross CrossVersion.full),
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams-test-utils
libraryDependencies += "org.apache.kafka" % "kafka-streams-test-utils" % "2.3.0" % Test
addCompilerPlugin("org.scalamacros" % "paradise" % "2.1.0" cross CrossVersion.full)
)
.dependsOn(commons)
.aggregate(commons)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package com.github.devcdcc
import io.circe.Json
import io.circe.generic.auto._, io.circe.parser._, io.circe.syntax._

package object domain {

sealed trait QueueRequest {
def userId: String
def requestType: Option[String]
//def requestType: Option[String]
def next_max_id: Option[String]
def hasNext: Option[Boolean]
def requestId: Option[String]
Expand All @@ -17,7 +16,7 @@ package object domain {

case class MediaRequest(
userId: String,
requestType: Option[String] = Some("media"),
//requestType: Option[String] = Some("media"),
next_max_id: Option[String] = None,
hasNext: Option[Boolean] = None,
requestId: Option[String] = None,
Expand All @@ -27,7 +26,7 @@ package object domain {

case class UserRequest(
userId: String,
requestType: Option[String] = None,
//requestType: Option[String] = None,
next_max_id: Option[String] = None,
hasNext: Option[Boolean] = None,
requestId: Option[String] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package com.github.devcdcc.helpers

import com.typesafe.config.{Config, ConfigFactory}

object TopicsHelper {
trait TopicsHelper {
def config: Config = ConfigFactory.load()
val parseErrorTopicLabel: String = config.getString("topics.parseErrorLabel")
val appenderTopic: String = config.getString("topics.scrapper.appender")
val dataTopic: String = config.getString("topics.scrapper.data")
val userScrapperTopic: String = config.getString("topics.scrapper.user")
val userMediaScrapperTopic: String = config.getString("topics.scrapper.userMedia")
val mediaElementScrapperTopic: String = config.getString("topics.scrapper.mediaElement")
Expand All @@ -14,3 +15,4 @@ object TopicsHelper {
val userFollowersScrapperTopic: String = config.getString("topics.scrapper.userFollowers")
val followersElementScrapperTopic: String = config.getString("topics.scrapper.followersElement")
}
object TopicsHelper extends TopicsHelper {}
1 change: 1 addition & 0 deletions ig-crawler/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ topics{
parseErrorLabel = "parseE rrors"
scrapper{
appender = "topic.appender"
data = "topic.data"
user = ${?TOPIC_USER}
user = "topic.users"
userMedia = ${?TOPIC_USER_MEDIA}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.github.devcdcc.crawler.api

package object exception {

type CrawlerException = Exception
case class NextElementNotFoundException(message: String = "Element not found") extends CrawlerException(message)
case class ConverterNotFoundException(message: String = "Element not found") extends CrawlerException(message)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@ package com.github.devcdcc.crawler.consumer

import java.util.Properties

import com.github.devcdcc.crawler.consumer.builder.AbstractBuilder
import com.github.devcdcc.crawler.consumer.builder.{AbstractBuilder, AppenderBuilder}
import com.github.devcdcc.crawler.consumer.builder.processor.MediaScrapperBuilder
import com.github.devcdcc.crawler.consumer.converters.media.{
AbstractMediaConverter,
CarouselMediaConverter,
SimpleMediaConverter
}
import com.github.devcdcc.crawler.consumer.converters.request.{AbstractRequestConverter, MediaRequestConverter}
import com.github.devcdcc.helpers.TopicsHelper._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.{StreamsConfig, Topology}

abstract class OrchestrationTrait[K] {
val converters: List[AbstractMediaConverter] = List(new SimpleMediaConverter, new CarouselMediaConverter)
val mediaConverters: List[AbstractMediaConverter] = List(new SimpleMediaConverter, new CarouselMediaConverter)
val requestConverters: List[AbstractRequestConverter] = List(new MediaRequestConverter)

val props: Properties = {
val p = new Properties()
Expand All @@ -26,10 +28,14 @@ abstract class OrchestrationTrait[K] {
p
}

val builder: StreamsBuilder = new StreamsBuilder
val scrappers: List[AbstractBuilder[String, String]] = List(new MediaScrapperBuilder(builder, converters))
lazy val build: Topology = builder.build()
scrappers.foreach(_.transact)
val builder: StreamsBuilder = new StreamsBuilder

val builders: List[AbstractBuilder[String, String]] = List(
new AppenderBuilder(builder = builder, converters = requestConverters),
new MediaScrapperBuilder(builder, mediaConverters)
)
lazy val build: Topology = builder.build()
builders.foreach(_.transact)

val kafkaStreams: K
def start(): Unit
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.github.devcdcc.crawler.consumer.builder

import io.circe.Json
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.scala.kstream.KStream

trait AbstractBuilder[K, V] {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,95 @@
package com.github.devcdcc.crawler.consumer.builder

import com.github.devcdcc.crawler.api.exception._
import com.github.devcdcc.crawler.consumer.converters.request.AbstractRequestConverter
import com.github.devcdcc.domain
import com.github.devcdcc.domain.QueueRequest
import com.github.devcdcc.helpers.TopicsHelper
import io.circe.Json
import io.circe.syntax._
import io.circe.generic.auto._

import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes._
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.scala.kstream.KStream
import io.circe.parser.decode
import io.circe.syntax._
import io.circe.parser.decode
import io.circe.syntax._

class AppenderBuilder[T <: domain.QueueRequest](
builder: StreamsBuilder,
override val topic: String = TopicsHelper.appenderTopic,
stream: Option[KStream[String, String]] = None,
converters: List[AbstractRequestConverter[T]])
extends AbstractBuilder[String, String] {
converters: List[AbstractRequestConverter])
extends BasicJsonStringBuilder(builder = builder, topic = topic, stream = stream) {

private[builder] def doRequest(original: QueueRequest): scala.Either[scala.Throwable, io.circe.Json] =
converters.find(_.isRequiredType(original)) match {
case None => Left(new NoSuchElementException("AbstractRequestConverter can't be found"))
case Some(converter) => converter.doRequest(original)
}

val topicStream: KStream[String, String] = stream.getOrElse(builder.stream(topic))
sealed case class NextBuild(original: QueueRequest, response: Either[Throwable, QueueRequest]);
private[builder] def getNextRequest(original: QueueRequest, response: Json): NextBuild = NextBuild(
original,
if (original.hasNext.contains(false))
Left(NextElementNotFoundException())
else
converters.find(_.isRequiredType(original)) match {
case None => Left(new NoSuchElementException("AbstractRequestConverter can't be found"))
case Some(converter) => converter.convert(original, response)
}
)

// private val parseStream = topicStream.mapValues((key, value) => )
private[builder] def jsonToRequest: Json => Either[Json, domain.QueueRequest] = { json =>
json.as[domain.QueueRequest].fold(_ => Left(json), request => Right(request))
}

// private val parseStream = topicStream.mapValues((key, value) => )
private lazy val queueRequestStream = parseSuccess(jsonStream).mapValues(jsonToRequest)
private lazy val failedQueueRequest =
queueRequestStream.flatMapValues((key, value) => value.swap.toOption).mapValues(_.noSpaces)
private lazy val httpResponseStream = validQueueRequest.mapValues(
request => doRequest(request).fold(fail => Left(request, fail), value => Right(request, value))
)
def validQueueRequest = queueRequestStream.flatMapValues((key, value) => value.toOption)
/*

*/
override def transact: Unit = {
super.transact
val x = ""
failedQueueRequest.to(s"$topic.error.parsing")
httpResponseStream
.flatMapValues((_, value) => value.swap.toOption)
.mapValues((key, error) => error._1.asJson.noSpaces)
.to(s"$topic.error.request")
val validResponse = httpResponseStream
.flatMapValues((_, value) => value.toOption)
validResponse
.mapValues(_._2.noSpaces)
.to(TopicsHelper.dataTopic)
val nextResponse = validResponse.mapValues((key, response) => validResponseToNextElement(response))
nextResponse
.filterNot(removeFinalElements)
.flatMapValues(_.swap.toOption)
.mapValues(_._1.asJson.noSpaces)
.to(s"$topic.error.next")
nextResponse.flatMapValues(_.toOption).mapValues(_.asJson.noSpaces).to(topic)
}

private def removeFinalElements(key: String, either: Either[(QueueRequest, Throwable), QueueRequest]) =
either match {
case Left((_, NextElementNotFoundException(_))) => true
case _ => false
}

private def validResponseToNextElement(
response: (QueueRequest, Json)
): Either[(QueueRequest, Throwable), QueueRequest] = {
val (request, value) = response
getNextRequest(request, value) match {
case NextBuild(original, Left(failure)) =>
Left(original, failure)
case NextBuild(original, Right(nextBuild)) =>
Right(nextBuild)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.github.devcdcc.crawler.consumer.builder.processor

import io.circe.Json

trait JsonFlatter {
sealed trait JsonFlatter {
protected def flatter(json: Json): Iterable[Json]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package com.github.devcdcc.crawler.consumer.converters.media

import com.github.devcdcc.crawler.consumer.converters.AbstractConverter
import io.circe.Json
import services.random.RandomGenerator

trait AbstractMediaConverter extends AbstractConverter[Int, Json, Json => Json] {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
package com.github.devcdcc.crawler.consumer.converters.media
import io.circe.Json
import cats.syntax.either._
import io.circe._
import io.circe.parser._
import io.circe.optics.JsonPath._
import services.random.RandomGeneratorObject
import io.circe.generic.auto._
import io.circe.syntax._

class SimpleMediaConverter extends AbstractMediaConverter {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@ import org.http4s.client.{Client, JavaNetClientBuilder}
import org.http4s.circe.CirceEntityDecoder._
import scala.concurrent.ExecutionContext

abstract class AbstractRequestConverter[A <: QueueRequest]
extends AbstractConverter[Option[String], A, (A, Json) => A]
with AbstractRequester[A, Either[Throwable, Json]] {
abstract class AbstractRequestConverter
extends AbstractConverter[Option[Any], QueueRequest, (QueueRequest, Json) => Either[Throwable, QueueRequest]]
with AbstractRequester[QueueRequest, Either[Throwable, Json]] {
implicit val ec: ExecutionContext = AbstractRequestConverter.ec
val httpClient: Client[IO] = AbstractRequestConverter.httpClient
val elementType: Option[Any] = None

override def isRequiredType(input: A): Boolean = input.requestType == elementType

override def doRequest(a: A): Either[Throwable, Json] = {
override def doRequest(a: QueueRequest): Either[Throwable, Json] = {
val query = Query.empty :+ ("next_max_id", a.next_max_id)
val uri = Uri(path = a.requestURl, query = query)
httpClient.expect[Json](uri).attempt.unsafeRunSync()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,28 @@
package com.github.devcdcc.crawler.consumer.converters.request

import com.github.devcdcc.domain
import com.github.devcdcc.domain.QueueRequest
import io.circe.Json

class MediaRequestConverter extends AbstractRequestConverter[domain.MediaRequest] {
import scala.runtime.Nothing$

class MediaRequestConverter extends AbstractRequestConverter {

val elementType: Option[String] = Some("media")
override def isRequiredType(input: QueueRequest): Boolean = input.isInstanceOf[domain.MediaRequest]

def convert: (domain.MediaRequest, Json) => domain.MediaRequest =
(request, json) => {
request.copy(
scrapperId = json.hcursor.downField("scrapperId").as[String].toOption,
next_max_id = json.hcursor.downField("next_max_id").as[String].toOption,
hasNext = json.hcursor.downField("more_available").as[Boolean].toOption
)
def convert: (domain.QueueRequest, Json) => Either[Throwable, domain.QueueRequest] =
(originalRequest, json) => {
val moreAvailable = json.hcursor.downField("more_available").as[Boolean].toOption
if (moreAvailable.isDefined) {
Right(
originalRequest
.asInstanceOf[domain.MediaRequest]
.copy(
scrapperId = json.hcursor.downField("scrapperId").as[String].toOption,
next_max_id = json.hcursor.downField("next_max_id").as[String].toOption,
hasNext = moreAvailable
)
)
} else Left(new NoSuchElementException("field more_available is false"))
}
}
Loading