Skip to main content
 首页 » 编程设计

python之Pyspark reduceByKey 与 (key, Dictionary) 元组

2024年10月24日10飞鱼

我在尝试使用 spark 对数据 block 进行 map-reduce 时遇到了一些困难。我想处理日志文件,我想减少到一个 (key, dict()) 元组。

但是我总是遇到错误。我不确定这是否是正确的方法。如果有任何建议,我会很高兴。结果,我想将所有内容映射到 (key, dict(values)。

这是我的 Mapper 和 Reducer

from collections import defaultdict 
 
 
a = {u'1058493694': {'age': {u'25': 1}, 'areacode': {'unknown': 1}, 'catg': {'unknown': 1}, 'city': {'unknown': 1}, 'country': {'unknown': 1}, 'ethnicity': {'unknown': 1}, 'gender': {u'u': 1}, 'geo_data': {'unknown': 1}, 'user_ip': {u'149.6.187.*': 1}}} 
b = {u'1058493694': {'age': {u'25': 1}, 'areacode': {'unknown': 1}, 'catg': {'unknown': 1}, 'city': {'London': 1}, 'country': {'unknown': 1}, 'ethnicity': {'unknown': 1}, 'gender': {u'Male': 1}, 'geo_data': {'unknown': 1}, 'user_ip': {u'149.6.187.*': 1}}} 
 
def getValueFromJson(json_obj, field): 
  try: 
    raw = dict(json_obj) 
    if field in raw: 
        if raw[field]: 
          return {raw[field]: 1} 
  except: 
    return {'unknown': 1} 
  return {'unknown': 1} 
 
def mapper(line): 
  attr = dict(defaultdict()) 
  user_id = line.get("user_id", "unknown") 
  user_attr = ["age", "gender"] 
  location_attr = ["city", "state", "post_code", "country", "areacode", "user_ip", "geo_data"] 
 
  # combine both lists 
  attr_list = user_attr + location_attr 
 
  for item in attr_list: 
    attr[item] = getValueFromJson(line, item) 
 
  return (user_id, attr) 
 
def reducer(a, b): 
  results = dict() 
  for key in a.keys(): 
    val = dict() 
    for k in a[key].keys() + b[key].keys(): 
      val[k] = a[key].get(k, 0) + b[key].get(k, 0) 
    results[key] = val 
  return results 

我不确定我是否可以按照我的方式使用 reducer 。任何有关实现我目标的最佳实践的帮助都将不胜感激。

user_pairs = imps_data.map(extractUserData) 
user_totals = user_pairs.reduceByKey(joinUserData) 
user_totals.take(25) 

然后我得到以下错误

--------------------------------------------------------------------------- 
Py4JJavaError                             Traceback (most recent call last) 
<ipython-input-461-e1b93b972cac> in <module>() 
      1 user_pairs = imps_data.map(extractUserData) 
      2 user_totals = user_pairs.reduceByKey(joinUserData) 
----> 3 user_totals.take(25) 
 
/home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in take(self, num) 
   1263  
   1264             p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) 
-> 1265             res = self.context.runJob(self, takeUpToNumLeft, p, True) 
   1266  
   1267             items += res 
 
/home/ubuntu/databricks/spark/python/pyspark/context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal) 
    895         mappedRDD = rdd.mapPartitions(partitionFunc) 
    896         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions, 
--> 897                                           allowLocal) 
    898         return list(_load_from_socket(port, mappedRDD._jrdd_deserializer)) 
    899  
 
/home/ubuntu/databricks/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 
    536         answer = self.gateway_client.send_command(command) 
    537         return_value = get_return_value(answer, self.gateway_client, 
--> 538                 self.target_id, self.name) 
    539  
    540         for temp_arg in temp_args: 
 
/home/ubuntu/databricks/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 
    298                 raise Py4JJavaError( 
    299                     'An error occurred while calling {0}{1}{2}.\n'. 
--> 300                     format(target_id, '.', name), value) 
    301             else: 
    302                 raise Py4JError( 
 
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 688.0 failed 4 times, most recent failure: Lost task 6.3 in stage 688.0 (TID 8308, 10.179.246.224): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
  File "/home/ubuntu/databricks/spark/python/pyspark/worker.py", line 111, in main 
    process() 
  File "/home/ubuntu/databricks/spark/python/pyspark/worker.py", line 106, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
  File "/home/ubuntu/databricks/spark/python/pyspark/rdd.py", line 2318, in pipeline_func 
    return func(split, prev_func(split, iterator)) 
  File "/home/ubuntu/databricks/spark/python/pyspark/rdd.py", line 2318, in pipeline_func 
    return func(split, prev_func(split, iterator)) 
  File "/home/ubuntu/databricks/spark/python/pyspark/rdd.py", line 304, in func 
    return f(iterator) 
  File "/home/ubuntu/databricks/spark/python/pyspark/rdd.py", line 1746, in combineLocally 
    merger.mergeValues(iterator) 
  File "/home/ubuntu/databricks/spark/python/pyspark/shuffle.py", line 266, in mergeValues 
    for k, v in iterator: 
  File "<ipython-input-456-90f3cdb37d50>", line 5, in extractUserData 
  File "/usr/lib/python2.7/json/__init__.py", line 338, in loads 
    return _default_decoder.decode(s) 
  File "/usr/lib/python2.7/json/decoder.py", line 366, in decode 
    obj, end = self.raw_decode(s, idx=_w(s, 0).end()) 
  File "/usr/lib/python2.7/json/decoder.py", line 382, in raw_decode 
    obj, end = self.scan_once(s, idx) 
ValueError: Unterminated string starting at: line 1 column 733 (char 732) 
 
    at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138) 
    at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:315) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:70) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
 
Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 

非常感谢 C

请您参考如下方法:

感谢您的建议。原来如此。

我只是在 rdd 上添加了一个过滤器,过滤掉具有无效 json 的行

data = sc.textFile(files, 20).filter(lambda line: isJson(line.split("||")[0])) 
 
import json 
 
def isJson(myjson): 
  try: 
    json_object = json.loads(myjson) 
  except ValueError, e: 
    return False 
  return True